From 5549779433c1ae08ea919d2edf66176f47141739 Mon Sep 17 00:00:00 2001 From: Anton Tykhyy Date: Fri, 28 Apr 2023 16:17:09 +0300 Subject: [PATCH 1/7] Eliminate cancellation deadlock in RateLimiter implementations In System.Threading.RateLimiting.ConcurrencyLimiter and the other three implementations using a synchronously locked deque, there is a deadlock between an outstanding wait-for-lease operation being canceled by user and releasing an existing lease. The internal Release() method has to clean up wait-for-lease operations in some circumstances. This involves disposing the operation's CancellationTokenRegistration, which blocks if the registered callback is running. This callback can be invoked on another thread if the external cancellation token fires. It locks the rate limiter to return the operation's permits to the rate limiter. If the external cancellation token fires after Release() runs but before it disposes of the CancellationTokenRegistration, there will be a deadlock as Release() holds the lock while waiting for the callback to complete while the callback blocks on the lock to return its permits. This change eliminates this deadlock by moving the cleanup of any wait-for-lease operations in Release() outside the lock. --- .../RateLimiting/ConcurrencyLimiter.cs | 107 ++++++++++-------- .../RateLimiting/FixedWindowRateLimiter.cs | 104 +++++++++-------- .../RateLimiting/SlidingWindowRateLimiter.cs | 104 +++++++++-------- .../RateLimiting/TokenBucketRateLimiter.cs | 106 +++++++++-------- 4 files changed, 230 insertions(+), 191 deletions(-) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs index 3ba07631f74bb3..a376c8c0da0d41 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs @@ -132,6 +132,8 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C return new ValueTask(SuccessfulLease); } + using var disposer = new RequestRegistration.Disposer(); + // Perf: Check SemaphoreSlim implementation instead of locking lock (Lock) { @@ -152,7 +154,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C RequestRegistration oldestRequest = _queue.DequeueHead(); _queueCount -= oldestRequest.Count; Debug.Assert(_queueCount >= 0); - if (!oldestRequest.Tcs.TrySetResult(FailedLease)) + if (!oldestRequest.TrySetResult(FailedLease)) { // Updating queue count is handled by the cancellation code _queueCount += oldestRequest.Count; @@ -161,7 +163,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C { Interlocked.Increment(ref _failedLeasesCount); } - oldestRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(oldestRequest); } while (_options.QueueLimit - _queueCount < permitCount); } @@ -173,22 +175,12 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C } } - CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken); - CancellationTokenRegistration ctr = default; - if (cancellationToken.CanBeCanceled) - { - ctr = cancellationToken.Register(static obj => - { - ((CancelQueueState)obj!).TrySetCanceled(); - }, tcs); - } - - RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr); + var request = new RequestRegistration(permitCount, this, cancellationToken); _queue.EnqueueTail(request); _queueCount += permitCount; Debug.Assert(_queueCount <= _options.QueueLimit); - return new ValueTask(request.Tcs.Task); + return new ValueTask(request.Task); } } @@ -226,6 +218,7 @@ private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out Rat private void Release(int releaseCount) { + using var disposer = new RequestRegistration.Disposer(); lock (Lock) { if (_disposed) @@ -245,13 +238,13 @@ private void Release(int releaseCount) // Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued. // We just need to remove the item and let the next queued item be considered for completion. - if (nextPendingRequest.Tcs.Task.IsCompleted) + if (nextPendingRequest.Task.IsCompleted) { nextPendingRequest = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(nextPendingRequest); } else if (_permitCount >= nextPendingRequest.Count) { @@ -266,7 +259,7 @@ private void Release(int releaseCount) ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count); // Check if request was canceled - if (!nextPendingRequest.Tcs.TrySetResult(lease)) + if (!nextPendingRequest.TrySetResult(lease)) { // Queued item was canceled so add count back _permitCount += nextPendingRequest.Count; @@ -277,7 +270,7 @@ private void Release(int releaseCount) { Interlocked.Increment(ref _successfulLeasesCount); } - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -302,6 +295,7 @@ protected override void Dispose(bool disposing) return; } + using var disposer = new RequestRegistration.Disposer(); lock (Lock) { if (_disposed) @@ -314,8 +308,8 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - next.CancellationTokenRegistration.Dispose(); - next.Tcs.TrySetResult(FailedLease); + disposer.Add(next); + next.TrySetResult(FailedLease); } } } @@ -383,49 +377,64 @@ protected override void Dispose(bool disposing) } } - private readonly struct RequestRegistration + private sealed class RequestRegistration : TaskCompletionSource { - public RequestRegistration(int requestedCount, TaskCompletionSource tcs, - CancellationTokenRegistration cancellationTokenRegistration) - { - Count = requestedCount; - // Perf: Use AsyncOperation instead - Tcs = tcs; - CancellationTokenRegistration = cancellationTokenRegistration; - } + private readonly CancellationToken _cancellationToken; + private CancellationTokenRegistration _cancellationTokenRegistration; - public int Count { get; } + // this field is used only by the disposal mechanics and never shared between threads + private RequestRegistration? _next; - public TaskCompletionSource Tcs { get; } + public RequestRegistration(int permitCount, ConcurrencyLimiter limiter, CancellationToken cancellationToken) + : base(limiter, TaskCreationOptions.RunContinuationsAsynchronously) + { + Count = permitCount; + _cancellationToken = cancellationToken; - public CancellationTokenRegistration CancellationTokenRegistration { get; } - } + // RequestRegistration objects are created while the limiter lock is held + // if cancellationToken fires before or while the lock is held, UnsafeRegister + // is going to invoke the callback synchronously, but this does not create + // a deadlock because lock are reentrant + if (cancellationToken.CanBeCanceled) + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this) ; + } - private sealed class CancelQueueState : TaskCompletionSource - { - private readonly int _permitCount; - private readonly ConcurrencyLimiter _limiter; - private readonly CancellationToken _cancellationToken; + public int Count { get; } - public CancelQueueState(int permitCount, ConcurrencyLimiter limiter, CancellationToken cancellationToken) - : base(TaskCreationOptions.RunContinuationsAsynchronously) + private static void Cancel(object? state) { - _permitCount = permitCount; - _limiter = limiter; - _cancellationToken = cancellationToken; + if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) + { + var limiter = (ConcurrencyLimiter)registration.Task.AsyncState!; + lock (limiter.Lock) + { + limiter._queueCount -= registration.Count; + } + } } - public new bool TrySetCanceled() + /// + /// Collects registrations to dispose outside the limiter lock to avoid deadlock. + /// + public struct Disposer : IDisposable { - if (TrySetCanceled(_cancellationToken)) + private RequestRegistration? _next; + + public void Add(RequestRegistration request) + { + request._next = _next; + _next = request; + } + + public void Dispose() { - lock (_limiter.Lock) + for (var current = _next; current is not null; current = current._next) { - _limiter._queueCount -= _permitCount; + current._cancellationTokenRegistration.Dispose(); } - return true; + + _next = null; } - return false; } } } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs index 037fc70dc3ab85..52c49a3cd96043 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs @@ -151,6 +151,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C return new ValueTask(SuccessfulLease); } + using var disposer = new RequestRegistration.Disposer(); lock (Lock) { if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease)) @@ -170,7 +171,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C RequestRegistration oldestRequest = _queue.DequeueHead(); _queueCount -= oldestRequest.Count; Debug.Assert(_queueCount >= 0); - if (!oldestRequest.Tcs.TrySetResult(FailedLease)) + if (!oldestRequest.TrySetResult(FailedLease)) { _queueCount += oldestRequest.Count; } @@ -178,7 +179,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C { Interlocked.Increment(ref _failedLeasesCount); } - oldestRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(oldestRequest); } while (_options.QueueLimit - _queueCount < permitCount); } @@ -190,22 +191,12 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C } } - CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken); - CancellationTokenRegistration ctr = default; - if (cancellationToken.CanBeCanceled) - { - ctr = cancellationToken.Register(static obj => - { - ((CancelQueueState)obj!).TrySetCanceled(); - }, tcs); - } - - RequestRegistration registration = new RequestRegistration(permitCount, tcs, ctr); + var registration = new RequestRegistration(permitCount, this, cancellationToken); _queue.EnqueueTail(registration); _queueCount += permitCount; Debug.Assert(_queueCount <= _options.QueueLimit); - return new ValueTask(registration.Tcs.Task); + return new ValueTask(registration.Task); } } @@ -280,6 +271,8 @@ private static void Replenish(object? state) // Used in tests that test behavior with specific time intervals private void ReplenishInternal(long nowTicks) { + using var disposer = new RequestRegistration.Disposer(); + // Method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes lock (Lock) { @@ -315,13 +308,13 @@ private void ReplenishInternal(long nowTicks) // Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued. // We just need to remove the item and let the next queued item be considered for completion. - if (nextPendingRequest.Tcs.Task.IsCompleted) + if (nextPendingRequest.Task.IsCompleted) { nextPendingRequest = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(nextPendingRequest); } else if (_permitCount >= nextPendingRequest.Count) { @@ -335,7 +328,7 @@ private void ReplenishInternal(long nowTicks) _permitCount -= nextPendingRequest.Count; Debug.Assert(_permitCount >= 0); - if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease)) + if (!nextPendingRequest.TrySetResult(SuccessfulLease)) { // Queued item was canceled so add count back _permitCount += nextPendingRequest.Count; @@ -346,7 +339,7 @@ private void ReplenishInternal(long nowTicks) { Interlocked.Increment(ref _successfulLeasesCount); } - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -372,6 +365,7 @@ protected override void Dispose(bool disposing) return; } + using var disposer = new RequestRegistration.Disposer(); lock (Lock) { if (_disposed) @@ -385,8 +379,8 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - next.CancellationTokenRegistration.Dispose(); - next.Tcs.TrySetResult(FailedLease); + disposer.Add(next); + next.TrySetResult(FailedLease); } } } @@ -435,48 +429,64 @@ public override bool TryGetMetadata(string metadataName, out object? metadata) } } - private readonly struct RequestRegistration + private sealed class RequestRegistration : TaskCompletionSource { - public RequestRegistration(int permitCount, TaskCompletionSource tcs, CancellationTokenRegistration cancellationTokenRegistration) + private readonly CancellationToken _cancellationToken; + private CancellationTokenRegistration _cancellationTokenRegistration; + + // this field is used only by the disposal mechanics and never shared between threads + private RequestRegistration? _next; + + public RequestRegistration(int permitCount, FixedWindowRateLimiter limiter, CancellationToken cancellationToken) + : base(limiter, TaskCreationOptions.RunContinuationsAsynchronously) { Count = permitCount; - // Use VoidAsyncOperationWithData instead - Tcs = tcs; - CancellationTokenRegistration = cancellationTokenRegistration; + _cancellationToken = cancellationToken; + + // RequestRegistration objects are created while the limiter lock is held + // if cancellationToken fires before or while the lock is held, UnsafeRegister + // is going to invoke the callback synchronously, but this does not create + // a deadlock because lock are reentrant + if (cancellationToken.CanBeCanceled) + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this) ; } public int Count { get; } - public TaskCompletionSource Tcs { get; } - - public CancellationTokenRegistration CancellationTokenRegistration { get; } - } - - private sealed class CancelQueueState : TaskCompletionSource - { - private readonly int _permitCount; - private readonly FixedWindowRateLimiter _limiter; - private readonly CancellationToken _cancellationToken; - - public CancelQueueState(int permitCount, FixedWindowRateLimiter limiter, CancellationToken cancellationToken) - : base(TaskCreationOptions.RunContinuationsAsynchronously) + private static void Cancel(object? state) { - _permitCount = permitCount; - _limiter = limiter; - _cancellationToken = cancellationToken; + if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) + { + var limiter = (FixedWindowRateLimiter)registration.Task.AsyncState!; + lock (limiter.Lock) + { + limiter._queueCount -= registration.Count; + } + } } - public new bool TrySetCanceled() + /// + /// Collects registrations to dispose outside the limiter lock to avoid deadlock. + /// + public struct Disposer : IDisposable { - if (TrySetCanceled(_cancellationToken)) + private RequestRegistration? _next; + + public void Add(RequestRegistration request) + { + request._next = _next; + _next = request; + } + + public void Dispose() { - lock (_limiter.Lock) + for (var current = _next; current is not null; current = current._next) { - _limiter._queueCount -= _permitCount; + current._cancellationTokenRegistration.Dispose(); } - return true; + + _next = null; } - return false; } } } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs index c2927e282c48eb..f66b59aeeeb825 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs @@ -163,6 +163,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C return new ValueTask(SuccessfulLease); } + using var disposer = new RequestRegistration.Disposer(); lock (Lock) { if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease)) @@ -182,7 +183,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C RequestRegistration oldestRequest = _queue.DequeueHead(); _queueCount -= oldestRequest.Count; Debug.Assert(_queueCount >= 0); - if (!oldestRequest.Tcs.TrySetResult(FailedLease)) + if (!oldestRequest.TrySetResult(FailedLease)) { _queueCount += oldestRequest.Count; } @@ -190,7 +191,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C { Interlocked.Increment(ref _failedLeasesCount); } - oldestRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(oldestRequest); } while (_options.QueueLimit - _queueCount < permitCount); } @@ -202,22 +203,12 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C } } - CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken); - CancellationTokenRegistration ctr = default; - if (cancellationToken.CanBeCanceled) - { - ctr = cancellationToken.Register(static obj => - { - ((CancelQueueState)obj!).TrySetCanceled(); - }, tcs); - } - - RequestRegistration registration = new RequestRegistration(permitCount, tcs, ctr); + var registration = new RequestRegistration(permitCount, this, cancellationToken); _queue.EnqueueTail(registration); _queueCount += permitCount; Debug.Assert(_queueCount <= _options.QueueLimit); - return new ValueTask(registration.Tcs.Task); + return new ValueTask(registration.Task); } } @@ -286,6 +277,8 @@ private static void Replenish(object? state) // Used in tests that test behavior with specific time intervals private void ReplenishInternal(long nowTicks) { + using var disposer = new RequestRegistration.Disposer(); + // Method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes lock (Lock) { @@ -325,13 +318,13 @@ private void ReplenishInternal(long nowTicks) // Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued. // We just need to remove the item and let the next queued item be considered for completion. - if (nextPendingRequest.Tcs.Task.IsCompleted) + if (nextPendingRequest.Task.IsCompleted) { nextPendingRequest = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(nextPendingRequest); } // If we have enough permits after replenishing to serve the queued requests else if (_permitCount >= nextPendingRequest.Count) @@ -347,7 +340,7 @@ private void ReplenishInternal(long nowTicks) _requestsPerSegment[_currentSegmentIndex] += nextPendingRequest.Count; Debug.Assert(_permitCount >= 0); - if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease)) + if (!nextPendingRequest.TrySetResult(SuccessfulLease)) { // Queued item was canceled so add count back _permitCount += nextPendingRequest.Count; @@ -359,7 +352,7 @@ private void ReplenishInternal(long nowTicks) { Interlocked.Increment(ref _successfulLeasesCount); } - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -385,6 +378,7 @@ protected override void Dispose(bool disposing) return; } + using var disposer = new RequestRegistration.Disposer(); lock (Lock) { if (_disposed) @@ -398,8 +392,8 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - next.CancellationTokenRegistration.Dispose(); - next.Tcs.TrySetResult(FailedLease); + disposer.Add(next); + next.TrySetResult(FailedLease); } } } @@ -448,48 +442,64 @@ public override bool TryGetMetadata(string metadataName, out object? metadata) } } - private readonly struct RequestRegistration + private sealed class RequestRegistration : TaskCompletionSource { - public RequestRegistration(int permitCount, TaskCompletionSource tcs, CancellationTokenRegistration cancellationTokenRegistration) + private readonly CancellationToken _cancellationToken; + private CancellationTokenRegistration _cancellationTokenRegistration; + + // this field is used only by the disposal mechanics and never shared between threads + private RequestRegistration? _next; + + public RequestRegistration(int permitCount, SlidingWindowRateLimiter limiter, CancellationToken cancellationToken) + : base(limiter, TaskCreationOptions.RunContinuationsAsynchronously) { Count = permitCount; - // Use VoidAsyncOperationWithData instead - Tcs = tcs; - CancellationTokenRegistration = cancellationTokenRegistration; + _cancellationToken = cancellationToken; + + // RequestRegistration objects are created while the limiter lock is held + // if cancellationToken fires before or while the lock is held, UnsafeRegister + // is going to invoke the callback synchronously, but this does not create + // a deadlock because lock are reentrant + if (cancellationToken.CanBeCanceled) + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this) ; } public int Count { get; } - public TaskCompletionSource Tcs { get; } - - public CancellationTokenRegistration CancellationTokenRegistration { get; } - } - - private sealed class CancelQueueState : TaskCompletionSource - { - private readonly int _permitCount; - private readonly SlidingWindowRateLimiter _limiter; - private readonly CancellationToken _cancellationToken; - - public CancelQueueState(int permitCount, SlidingWindowRateLimiter limiter, CancellationToken cancellationToken) - : base(TaskCreationOptions.RunContinuationsAsynchronously) + private static void Cancel(object? state) { - _permitCount = permitCount; - _limiter = limiter; - _cancellationToken = cancellationToken; + if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) + { + var limiter = (SlidingWindowRateLimiter)registration.Task.AsyncState!; + lock (limiter.Lock) + { + limiter._queueCount -= registration.Count; + } + } } - public new bool TrySetCanceled() + /// + /// Collects registrations to dispose outside the limiter lock to avoid deadlock. + /// + public struct Disposer : IDisposable { - if (TrySetCanceled(_cancellationToken)) + private RequestRegistration? _next; + + public void Add(RequestRegistration request) + { + request._next = _next; + _next = request; + } + + public void Dispose() { - lock (_limiter.Lock) + for (var current = _next; current is not null; current = current._next) { - _limiter._queueCount -= _permitCount; + current._cancellationTokenRegistration.Dispose(); } - return true; + + _next = null; } - return false; } } } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs index b94937600f9440..bb5381796357f5 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs @@ -156,6 +156,7 @@ protected override ValueTask AcquireAsyncCore(int tokenCount, Ca return new ValueTask(SuccessfulLease); } + using var disposer = new RequestRegistration.Disposer(); lock (Lock) { if (TryLeaseUnsynchronized(tokenCount, out RateLimitLease? lease)) @@ -175,7 +176,7 @@ protected override ValueTask AcquireAsyncCore(int tokenCount, Ca RequestRegistration oldestRequest = _queue.DequeueHead(); _queueCount -= oldestRequest.Count; Debug.Assert(_queueCount >= 0); - if (!oldestRequest.Tcs.TrySetResult(FailedLease)) + if (!oldestRequest.TrySetResult(FailedLease)) { // Updating queue count is handled by the cancellation code _queueCount += oldestRequest.Count; @@ -184,7 +185,7 @@ protected override ValueTask AcquireAsyncCore(int tokenCount, Ca { Interlocked.Increment(ref _failedLeasesCount); } - oldestRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(oldestRequest); } while (_options.QueueLimit - _queueCount < tokenCount); } @@ -196,22 +197,12 @@ protected override ValueTask AcquireAsyncCore(int tokenCount, Ca } } - CancelQueueState tcs = new CancelQueueState(tokenCount, this, cancellationToken); - CancellationTokenRegistration ctr = default; - if (cancellationToken.CanBeCanceled) - { - ctr = cancellationToken.Register(static obj => - { - ((CancelQueueState)obj!).TrySetCanceled(); - }, tcs); - } - - RequestRegistration registration = new RequestRegistration(tokenCount, tcs, ctr); + var registration = new RequestRegistration(tokenCount, this, cancellationToken); _queue.EnqueueTail(registration); _queueCount += tokenCount; Debug.Assert(_queueCount <= _options.QueueLimit); - return new ValueTask(registration.Tcs.Task); + return new ValueTask(registration.Task); } } @@ -288,6 +279,8 @@ private static void Replenish(object? state) // Used in tests to avoid dealing with real time private void ReplenishInternal(long nowTicks) { + using var disposer = new RequestRegistration.Disposer(); + // method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes lock (Lock) { @@ -330,13 +323,13 @@ private void ReplenishInternal(long nowTicks) // Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued. // We just need to remove the item and let the next queued item be considered for completion. - if (nextPendingRequest.Tcs.Task.IsCompleted) + if (nextPendingRequest.Task.IsCompleted) { nextPendingRequest = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? queue.DequeueHead() : queue.DequeueTail(); - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(nextPendingRequest); } else if (_tokenCount >= nextPendingRequest.Count) { @@ -350,7 +343,7 @@ private void ReplenishInternal(long nowTicks) _tokenCount -= nextPendingRequest.Count; Debug.Assert(_tokenCount >= 0); - if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease)) + if (!nextPendingRequest.TrySetResult(SuccessfulLease)) { // Queued item was canceled so add count back _tokenCount += nextPendingRequest.Count; @@ -361,7 +354,7 @@ private void ReplenishInternal(long nowTicks) { Interlocked.Increment(ref _successfulLeasesCount); } - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -387,6 +380,7 @@ protected override void Dispose(bool disposing) return; } + using var disposer = new RequestRegistration.Disposer(); lock (Lock) { if (_disposed) @@ -400,8 +394,8 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - next.CancellationTokenRegistration.Dispose(); - next.Tcs.TrySetResult(FailedLease); + disposer.Add(next); + next.TrySetResult(FailedLease); } } } @@ -450,48 +444,64 @@ public override bool TryGetMetadata(string metadataName, out object? metadata) } } - private readonly struct RequestRegistration + private sealed class RequestRegistration : TaskCompletionSource { - public RequestRegistration(int tokenCount, TaskCompletionSource tcs, CancellationTokenRegistration cancellationTokenRegistration) - { - Count = tokenCount; - // Use VoidAsyncOperationWithData instead - Tcs = tcs; - CancellationTokenRegistration = cancellationTokenRegistration; - } + private readonly CancellationToken _cancellationToken; + private CancellationTokenRegistration _cancellationTokenRegistration; - public int Count { get; } + // this field is used only by the disposal mechanics and never shared between threads + private RequestRegistration? _next; - public TaskCompletionSource Tcs { get; } + public RequestRegistration(int permitCount, TokenBucketRateLimiter limiter, CancellationToken cancellationToken) + : base(limiter, TaskCreationOptions.RunContinuationsAsynchronously) + { + Count = permitCount; + _cancellationToken = cancellationToken; - public CancellationTokenRegistration CancellationTokenRegistration { get; } - } + // RequestRegistration objects are created while the limiter lock is held + // if cancellationToken fires before or while the lock is held, UnsafeRegister + // is going to invoke the callback synchronously, but this does not create + // a deadlock because lock are reentrant + if (cancellationToken.CanBeCanceled) + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this) ; + } - private sealed class CancelQueueState : TaskCompletionSource - { - private readonly int _tokenCount; - private readonly TokenBucketRateLimiter _limiter; - private readonly CancellationToken _cancellationToken; + public int Count { get; } - public CancelQueueState(int tokenCount, TokenBucketRateLimiter limiter, CancellationToken cancellationToken) - : base(TaskCreationOptions.RunContinuationsAsynchronously) + private static void Cancel(object? state) { - _tokenCount = tokenCount; - _limiter = limiter; - _cancellationToken = cancellationToken; + if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) + { + var limiter = (TokenBucketRateLimiter)registration.Task.AsyncState!; + lock (limiter.Lock) + { + limiter._queueCount -= registration.Count; + } + } } - public new bool TrySetCanceled() + /// + /// Collects registrations to dispose outside the limiter lock to avoid deadlock. + /// + public struct Disposer : IDisposable { - if (TrySetCanceled(_cancellationToken)) + private RequestRegistration? _next; + + public void Add(RequestRegistration request) + { + request._next = _next; + _next = request; + } + + public void Dispose() { - lock (_limiter.Lock) + for (var current = _next; current is not null; current = current._next) { - _limiter._queueCount -= _tokenCount; + current._cancellationTokenRegistration.Dispose(); } - return true; + + _next = null; } - return false; } } } From 60050753d841ba25f44f8cb8d3d0a4c13bc8cd20 Mon Sep 17 00:00:00 2001 From: Anton Tykhyy Date: Sat, 29 Apr 2023 07:36:10 +0300 Subject: [PATCH 2/7] Fix SA1129 warnings (do not use default value type ctor) --- .../src/System/Threading/RateLimiting/ConcurrencyLimiter.cs | 6 +++--- .../System/Threading/RateLimiting/FixedWindowRateLimiter.cs | 6 +++--- .../Threading/RateLimiting/SlidingWindowRateLimiter.cs | 6 +++--- .../System/Threading/RateLimiting/TokenBucketRateLimiter.cs | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs index a376c8c0da0d41..420a2bad064d46 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs @@ -132,7 +132,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C return new ValueTask(SuccessfulLease); } - using var disposer = new RequestRegistration.Disposer(); + using var disposer = default(RequestRegistration.Disposer); // Perf: Check SemaphoreSlim implementation instead of locking lock (Lock) @@ -218,7 +218,7 @@ private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out Rat private void Release(int releaseCount) { - using var disposer = new RequestRegistration.Disposer(); + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) @@ -295,7 +295,7 @@ protected override void Dispose(bool disposing) return; } - using var disposer = new RequestRegistration.Disposer(); + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs index 52c49a3cd96043..e54cc4ae2492fa 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs @@ -151,7 +151,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C return new ValueTask(SuccessfulLease); } - using var disposer = new RequestRegistration.Disposer(); + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease)) @@ -271,7 +271,7 @@ private static void Replenish(object? state) // Used in tests that test behavior with specific time intervals private void ReplenishInternal(long nowTicks) { - using var disposer = new RequestRegistration.Disposer(); + using var disposer = default(RequestRegistration.Disposer); // Method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes lock (Lock) @@ -365,7 +365,7 @@ protected override void Dispose(bool disposing) return; } - using var disposer = new RequestRegistration.Disposer(); + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs index f66b59aeeeb825..db108d02ba8839 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs @@ -163,7 +163,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C return new ValueTask(SuccessfulLease); } - using var disposer = new RequestRegistration.Disposer(); + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease)) @@ -277,7 +277,7 @@ private static void Replenish(object? state) // Used in tests that test behavior with specific time intervals private void ReplenishInternal(long nowTicks) { - using var disposer = new RequestRegistration.Disposer(); + using var disposer = default(RequestRegistration.Disposer); // Method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes lock (Lock) @@ -378,7 +378,7 @@ protected override void Dispose(bool disposing) return; } - using var disposer = new RequestRegistration.Disposer(); + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs index bb5381796357f5..9918dc6c577fa5 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs @@ -156,7 +156,7 @@ protected override ValueTask AcquireAsyncCore(int tokenCount, Ca return new ValueTask(SuccessfulLease); } - using var disposer = new RequestRegistration.Disposer(); + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (TryLeaseUnsynchronized(tokenCount, out RateLimitLease? lease)) @@ -279,7 +279,7 @@ private static void Replenish(object? state) // Used in tests to avoid dealing with real time private void ReplenishInternal(long nowTicks) { - using var disposer = new RequestRegistration.Disposer(); + using var disposer = default(RequestRegistration.Disposer); // method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes lock (Lock) @@ -380,7 +380,7 @@ protected override void Dispose(bool disposing) return; } - using var disposer = new RequestRegistration.Disposer(); + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) From 44503cf672001ee50cd64c02da460e9009bd935f Mon Sep 17 00:00:00 2001 From: Anton Tykhyy Date: Sat, 29 Apr 2023 09:27:01 +0300 Subject: [PATCH 3/7] Use CancellationToken.Register on full .NET framework --- .../src/System/Threading/RateLimiting/ConcurrencyLimiter.cs | 4 ++++ .../System/Threading/RateLimiting/FixedWindowRateLimiter.cs | 4 ++++ .../System/Threading/RateLimiting/SlidingWindowRateLimiter.cs | 4 ++++ .../System/Threading/RateLimiting/TokenBucketRateLimiter.cs | 4 ++++ 4 files changed, 16 insertions(+) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs index 420a2bad064d46..7e1b9d9c31521c 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs @@ -396,7 +396,11 @@ public RequestRegistration(int permitCount, ConcurrencyLimiter limiter, Cancella // is going to invoke the callback synchronously, but this does not create // a deadlock because lock are reentrant if (cancellationToken.CanBeCanceled) +#if NETCOREAPP || NETSTANDARD2_1 _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this) ; +#else + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this) ; +#endif } public int Count { get; } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs index e54cc4ae2492fa..4a607956875061 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs @@ -448,7 +448,11 @@ public RequestRegistration(int permitCount, FixedWindowRateLimiter limiter, Canc // is going to invoke the callback synchronously, but this does not create // a deadlock because lock are reentrant if (cancellationToken.CanBeCanceled) +#if NETCOREAPP || NETSTANDARD2_1 _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this) ; +#else + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this) ; +#endif } public int Count { get; } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs index db108d02ba8839..6d25a43d4f21a7 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs @@ -461,7 +461,11 @@ public RequestRegistration(int permitCount, SlidingWindowRateLimiter limiter, Ca // is going to invoke the callback synchronously, but this does not create // a deadlock because lock are reentrant if (cancellationToken.CanBeCanceled) +#if NETCOREAPP || NETSTANDARD2_1 _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this) ; +#else + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this) ; +#endif } public int Count { get; } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs index 9918dc6c577fa5..0acd91150032ca 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs @@ -463,7 +463,11 @@ public RequestRegistration(int permitCount, TokenBucketRateLimiter limiter, Canc // is going to invoke the callback synchronously, but this does not create // a deadlock because lock are reentrant if (cancellationToken.CanBeCanceled) +#if NETCOREAPP || NETSTANDARD2_1 _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this) ; +#else + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this) ; +#endif } public int Count { get; } From 25b17d7f72f53349d628712b6908cbeedff7e03e Mon Sep 17 00:00:00 2001 From: Anton Tykhyy Date: Fri, 26 May 2023 10:17:23 +0300 Subject: [PATCH 4/7] Fix spacing, use correct conditional compilation symbol --- .../src/System/Threading/RateLimiting/ConcurrencyLimiter.cs | 6 +++--- .../System/Threading/RateLimiting/FixedWindowRateLimiter.cs | 6 +++--- .../Threading/RateLimiting/SlidingWindowRateLimiter.cs | 6 +++--- .../System/Threading/RateLimiting/TokenBucketRateLimiter.cs | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs index 7e1b9d9c31521c..c2ba9cac9cc79e 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs @@ -396,10 +396,10 @@ public RequestRegistration(int permitCount, ConcurrencyLimiter limiter, Cancella // is going to invoke the callback synchronously, but this does not create // a deadlock because lock are reentrant if (cancellationToken.CanBeCanceled) -#if NETCOREAPP || NETSTANDARD2_1 - _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this) ; +#if NETCOREAPP || NETSTANDARD2_1_OR_GREATER + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this); #else - _cancellationTokenRegistration = cancellationToken.Register(Cancel, this) ; + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this); #endif } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs index 4a607956875061..16faeaec9f9919 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs @@ -448,10 +448,10 @@ public RequestRegistration(int permitCount, FixedWindowRateLimiter limiter, Canc // is going to invoke the callback synchronously, but this does not create // a deadlock because lock are reentrant if (cancellationToken.CanBeCanceled) -#if NETCOREAPP || NETSTANDARD2_1 - _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this) ; +#if NETCOREAPP || NETSTANDARD2_1_OR_GREATER + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this); #else - _cancellationTokenRegistration = cancellationToken.Register(Cancel, this) ; + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this); #endif } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs index 6d25a43d4f21a7..6b73c758bdf1c7 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs @@ -461,10 +461,10 @@ public RequestRegistration(int permitCount, SlidingWindowRateLimiter limiter, Ca // is going to invoke the callback synchronously, but this does not create // a deadlock because lock are reentrant if (cancellationToken.CanBeCanceled) -#if NETCOREAPP || NETSTANDARD2_1 - _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this) ; +#if NETCOREAPP || NETSTANDARD2_1_OR_GREATER + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this); #else - _cancellationTokenRegistration = cancellationToken.Register(Cancel, this) ; + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this); #endif } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs index 0acd91150032ca..f65bf70c9eb640 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs @@ -463,10 +463,10 @@ public RequestRegistration(int permitCount, TokenBucketRateLimiter limiter, Canc // is going to invoke the callback synchronously, but this does not create // a deadlock because lock are reentrant if (cancellationToken.CanBeCanceled) -#if NETCOREAPP || NETSTANDARD2_1 - _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this) ; +#if NETCOREAPP || NETSTANDARD2_1_OR_GREATER + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this); #else - _cancellationTokenRegistration = cancellationToken.Register(Cancel, this) ; + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this); #endif } From 5b3ff5c0dbf286da8d84cfd129007bc349b68d6b Mon Sep 17 00:00:00 2001 From: Anton Tykhyy Date: Sat, 10 Jun 2023 21:26:30 +0300 Subject: [PATCH 5/7] Add unit test demonstrating deadlock described in #85523 --- .../src/Properties/InternalsVisibleTo.cs | 6 ++++ .../RateLimiting/ConcurrencyLimiter.cs | 18 +++++++++- .../tests/ConcurrencyLimiterTests.cs | 35 +++++++++++++++++++ 3 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 src/libraries/System.Threading.RateLimiting/src/Properties/InternalsVisibleTo.cs diff --git a/src/libraries/System.Threading.RateLimiting/src/Properties/InternalsVisibleTo.cs b/src/libraries/System.Threading.RateLimiting/src/Properties/InternalsVisibleTo.cs new file mode 100644 index 00000000000000..78086e4d2350f2 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/Properties/InternalsVisibleTo.cs @@ -0,0 +1,6 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("System.Threading.RateLimiting.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001004b86c4cb78549b34bab61a3b1800e23bfeb5b3ec390074041536a7e3cbd97f5f04cf0f857155a8928eaa29ebfd11cfbbad3ba70efea7bda3226c6a8d370a4cd303f714486b6ebc225985a638471e6ef571cc92a4613c00b8fa65d61ccee0cbe5f36330c9a01f4183559f1bef24cc2917c6d913e3a541333a1d05d9bed22b38cb")] diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs index c2ba9cac9cc79e..e737828e9d1610 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs @@ -216,6 +216,12 @@ private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out Rat return false; } +#if DEBUG + // for unit testing + internal event Action? ReleasePreHook; + internal event Action? ReleasePostHook; +#endif + private void Release(int releaseCount) { using var disposer = default(RequestRegistration.Disposer); @@ -229,6 +235,10 @@ private void Release(int releaseCount) _permitCount += releaseCount; Debug.Assert(_permitCount <= _options.PermitLimit); +#if DEBUG + ReleasePreHook?.Invoke(); +#endif + while (_queue.Count > 0) { RequestRegistration nextPendingRequest = @@ -245,8 +255,14 @@ private void Release(int releaseCount) ? _queue.DequeueHead() : _queue.DequeueTail(); disposer.Add(nextPendingRequest); + continue; } - else if (_permitCount >= nextPendingRequest.Count) + +#if DEBUG + ReleasePostHook?.Invoke(); +#endif + + if (_permitCount >= nextPendingRequest.Count) { nextPendingRequest = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst diff --git a/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs index cc503a050dbc36..94ae6d70d77661 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs @@ -123,6 +123,41 @@ public override async Task CanAcquireResourceAsync_QueuesAndGrabsNewest() Assert.True(lease.IsAcquired); } +#if DEBUG + [Fact] + public Task DoesNotDeadlockCleaningUpCanceledRequestedLease_Pre() => + DoesNotDeadlockCleaningUpCanceledRequestedLease((limiter, hook) => limiter.ReleasePreHook += hook); + + [Fact] + public Task DoesNotDeadlockCleaningUpCanceledRequestedLease_Post() => + DoesNotDeadlockCleaningUpCanceledRequestedLease((limiter, hook) => limiter.ReleasePostHook += hook); + + private async Task DoesNotDeadlockCleaningUpCanceledRequestedLease(Action attachHook) + { + using var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 1 + }); + var lease = limiter.AttemptAcquire(1); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + _ = limiter.AcquireAsync(1, cts.Token); + attachHook(limiter, () => + { + Task.Run(cts.Cancel); + Thread.Sleep(1); + }); + + var task1 = Task.Delay(1000); + var task2 = Task.Run(lease.Dispose); + Assert.Same(task2, await Task.WhenAny(task1, task2)); + await task2; + } +#endif + [Fact] public override async Task FailsWhenQueuingMoreThanLimit_OldestFirst() { From a395d4cc2b86cebb583c323cb99e2134b4d8c873 Mon Sep 17 00:00:00 2001 From: Anton Tykhyy Date: Sat, 10 Jun 2023 22:25:06 +0300 Subject: [PATCH 6/7] Rework XyzRateLimiter._queueCount cleanup handling --- .../RateLimiting/ConcurrencyLimiter.cs | 38 ++++++++++--------- .../RateLimiting/FixedWindowRateLimiter.cs | 37 ++++++++++-------- .../RateLimiting/SlidingWindowRateLimiter.cs | 37 ++++++++++-------- .../RateLimiting/TokenBucketRateLimiter.cs | 38 ++++++++++--------- 4 files changed, 84 insertions(+), 66 deletions(-) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs index e737828e9d1610..177aa65dfc061c 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs @@ -152,18 +152,13 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C do { RequestRegistration oldestRequest = _queue.DequeueHead(); - _queueCount -= oldestRequest.Count; - Debug.Assert(_queueCount >= 0); - if (!oldestRequest.TrySetResult(FailedLease)) - { - // Updating queue count is handled by the cancellation code - _queueCount += oldestRequest.Count; - } - else + if (oldestRequest.TrySetResult(FailedLease)) { + // Updating queue count is handled by the cancellation/cleanup code Interlocked.Increment(ref _failedLeasesCount); } disposer.Add(oldestRequest); + Debug.Assert(_queueCount >= 0); } while (_options.QueueLimit - _queueCount < permitCount); } @@ -269,8 +264,8 @@ private void Release(int releaseCount) ? _queue.DequeueHead() : _queue.DequeueTail(); + // Updating queue count is handled by the cancellation/cleanup code _permitCount -= nextPendingRequest.Count; - _queueCount -= nextPendingRequest.Count; Debug.Assert(_permitCount >= 0); ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count); @@ -279,8 +274,6 @@ private void Release(int releaseCount) { // Queued item was canceled so add count back _permitCount += nextPendingRequest.Count; - // Updating queue count is handled by the cancellation code - _queueCount += nextPendingRequest.Count; } else { @@ -327,6 +320,7 @@ protected override void Dispose(bool disposing) disposer.Add(next); next.TrySetResult(FailedLease); } + Debug.Assert(_queueCount == 0); } } @@ -419,17 +413,26 @@ public RequestRegistration(int permitCount, ConcurrencyLimiter limiter, Cancella #endif } - public int Count { get; } + /// + /// This property is only accessed under limiter lock. + /// + public int Count { get; private set; } private static void Cancel(object? state) { if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) { - var limiter = (ConcurrencyLimiter)registration.Task.AsyncState!; - lock (limiter.Lock) - { - limiter._queueCount -= registration.Count; - } + registration.Cleanup(); + } + } + + private void Cleanup() + { + var limiter = (ConcurrencyLimiter)Task.AsyncState!; + lock (limiter.Lock) + { + limiter._queueCount -= Count; + Count = 0; } } @@ -442,6 +445,7 @@ public struct Disposer : IDisposable public void Add(RequestRegistration request) { + request.Cleanup(); request._next = _next; _next = request; } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs index 16faeaec9f9919..6f2d1a00429059 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs @@ -169,17 +169,13 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C do { RequestRegistration oldestRequest = _queue.DequeueHead(); - _queueCount -= oldestRequest.Count; - Debug.Assert(_queueCount >= 0); - if (!oldestRequest.TrySetResult(FailedLease)) - { - _queueCount += oldestRequest.Count; - } - else + if (oldestRequest.TrySetResult(FailedLease)) { + // Updating queue count is handled by the cancellation/cleanup code Interlocked.Increment(ref _failedLeasesCount); } disposer.Add(oldestRequest); + Debug.Assert(_queueCount >= 0); } while (_options.QueueLimit - _queueCount < permitCount); } @@ -324,7 +320,7 @@ private void ReplenishInternal(long nowTicks) ? _queue.DequeueHead() : _queue.DequeueTail(); - _queueCount -= nextPendingRequest.Count; + // Updating queue count is handled by the cancellation/cleanup code _permitCount -= nextPendingRequest.Count; Debug.Assert(_permitCount >= 0); @@ -332,8 +328,6 @@ private void ReplenishInternal(long nowTicks) { // Queued item was canceled so add count back _permitCount += nextPendingRequest.Count; - // Updating queue count is handled by the cancellation code - _queueCount += nextPendingRequest.Count; } else { @@ -382,6 +376,7 @@ protected override void Dispose(bool disposing) disposer.Add(next); next.TrySetResult(FailedLease); } + Debug.Assert(_queueCount == 0); } } @@ -455,17 +450,26 @@ public RequestRegistration(int permitCount, FixedWindowRateLimiter limiter, Canc #endif } - public int Count { get; } + /// + /// This property is only accessed under limiter lock. + /// + public int Count { get; private set; } private static void Cancel(object? state) { if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) { - var limiter = (FixedWindowRateLimiter)registration.Task.AsyncState!; - lock (limiter.Lock) - { - limiter._queueCount -= registration.Count; - } + registration.Cleanup(); + } + } + + private void Cleanup() + { + var limiter = (FixedWindowRateLimiter)Task.AsyncState!; + lock (limiter.Lock) + { + limiter._queueCount -= Count; + Count = 0; } } @@ -478,6 +482,7 @@ public struct Disposer : IDisposable public void Add(RequestRegistration request) { + request.Cleanup(); request._next = _next; _next = request; } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs index 6b73c758bdf1c7..5df75525bd43a9 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs @@ -181,17 +181,13 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C do { RequestRegistration oldestRequest = _queue.DequeueHead(); - _queueCount -= oldestRequest.Count; - Debug.Assert(_queueCount >= 0); - if (!oldestRequest.TrySetResult(FailedLease)) - { - _queueCount += oldestRequest.Count; - } - else + if (oldestRequest.TrySetResult(FailedLease)) { + // Updating queue count is handled by the cancellation/cleanup code Interlocked.Increment(ref _failedLeasesCount); } disposer.Add(oldestRequest); + Debug.Assert(_queueCount >= 0); } while (_options.QueueLimit - _queueCount < permitCount); } @@ -335,7 +331,7 @@ private void ReplenishInternal(long nowTicks) ? _queue.DequeueHead() : _queue.DequeueTail(); - _queueCount -= nextPendingRequest.Count; + // Updating queue count is handled by the cancellation/cleanup code _permitCount -= nextPendingRequest.Count; _requestsPerSegment[_currentSegmentIndex] += nextPendingRequest.Count; Debug.Assert(_permitCount >= 0); @@ -345,8 +341,6 @@ private void ReplenishInternal(long nowTicks) // Queued item was canceled so add count back _permitCount += nextPendingRequest.Count; _requestsPerSegment[_currentSegmentIndex] -= nextPendingRequest.Count; - // Updating queue count is handled by the cancellation code - _queueCount += nextPendingRequest.Count; } else { @@ -395,6 +389,7 @@ protected override void Dispose(bool disposing) disposer.Add(next); next.TrySetResult(FailedLease); } + Debug.Assert(_queueCount == 0); } } @@ -468,17 +463,26 @@ public RequestRegistration(int permitCount, SlidingWindowRateLimiter limiter, Ca #endif } - public int Count { get; } + /// + /// This property is only accessed under limiter lock. + /// + public int Count { get; private set; } private static void Cancel(object? state) { if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) { - var limiter = (SlidingWindowRateLimiter)registration.Task.AsyncState!; - lock (limiter.Lock) - { - limiter._queueCount -= registration.Count; - } + registration.Cleanup(); + } + } + + private void Cleanup() + { + var limiter = (SlidingWindowRateLimiter)Task.AsyncState!; + lock (limiter.Lock) + { + limiter._queueCount -= Count; + Count = 0; } } @@ -491,6 +495,7 @@ public struct Disposer : IDisposable public void Add(RequestRegistration request) { + request.Cleanup(); request._next = _next; _next = request; } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs index f65bf70c9eb640..719eb8f611e961 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs @@ -174,18 +174,13 @@ protected override ValueTask AcquireAsyncCore(int tokenCount, Ca do { RequestRegistration oldestRequest = _queue.DequeueHead(); - _queueCount -= oldestRequest.Count; - Debug.Assert(_queueCount >= 0); - if (!oldestRequest.TrySetResult(FailedLease)) - { - // Updating queue count is handled by the cancellation code - _queueCount += oldestRequest.Count; - } - else + if (oldestRequest.TrySetResult(FailedLease)) { + // Updating queue count is handled by the cancellation/cleanup code Interlocked.Increment(ref _failedLeasesCount); } disposer.Add(oldestRequest); + Debug.Assert(_queueCount >= 0); } while (_options.QueueLimit - _queueCount < tokenCount); } @@ -339,7 +334,7 @@ private void ReplenishInternal(long nowTicks) ? queue.DequeueHead() : queue.DequeueTail(); - _queueCount -= nextPendingRequest.Count; + // Updating queue count is handled by the cancellation/cleanup code _tokenCount -= nextPendingRequest.Count; Debug.Assert(_tokenCount >= 0); @@ -347,8 +342,6 @@ private void ReplenishInternal(long nowTicks) { // Queued item was canceled so add count back _tokenCount += nextPendingRequest.Count; - // Updating queue count is handled by the cancellation code - _queueCount += nextPendingRequest.Count; } else { @@ -397,6 +390,7 @@ protected override void Dispose(bool disposing) disposer.Add(next); next.TrySetResult(FailedLease); } + Debug.Assert(_queueCount == 0); } } @@ -470,17 +464,26 @@ public RequestRegistration(int permitCount, TokenBucketRateLimiter limiter, Canc #endif } - public int Count { get; } + /// + /// This property is only accessed under limiter lock. + /// + public int Count { get; private set; } private static void Cancel(object? state) { if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) { - var limiter = (TokenBucketRateLimiter)registration.Task.AsyncState!; - lock (limiter.Lock) - { - limiter._queueCount -= registration.Count; - } + registration.Cleanup(); + } + } + + private void Cleanup() + { + var limiter = (TokenBucketRateLimiter)Task.AsyncState!; + lock (limiter.Lock) + { + limiter._queueCount -= Count; + Count = 0; } } @@ -493,6 +496,7 @@ public struct Disposer : IDisposable public void Add(RequestRegistration request) { + request.Cleanup(); request._next = _next; _next = request; } From e3509b4ce56b372a6d50b087295568c4c3c3772a Mon Sep 17 00:00:00 2001 From: Anton Tykhyy Date: Mon, 12 Jun 2023 00:09:08 +0300 Subject: [PATCH 7/7] Rename method to express intent better --- .../Threading/RateLimiting/ConcurrencyLimiter.cs | 10 +++++----- .../Threading/RateLimiting/FixedWindowRateLimiter.cs | 10 +++++----- .../Threading/RateLimiting/SlidingWindowRateLimiter.cs | 10 +++++----- .../Threading/RateLimiting/TokenBucketRateLimiter.cs | 10 +++++----- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs index 177aa65dfc061c..44dac52ee908c4 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs @@ -157,7 +157,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C // Updating queue count is handled by the cancellation/cleanup code Interlocked.Increment(ref _failedLeasesCount); } - disposer.Add(oldestRequest); + disposer.CleanupAndAdd(oldestRequest); Debug.Assert(_queueCount >= 0); } while (_options.QueueLimit - _queueCount < permitCount); @@ -249,7 +249,7 @@ private void Release(int releaseCount) _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - disposer.Add(nextPendingRequest); + disposer.CleanupAndAdd(nextPendingRequest); continue; } @@ -279,7 +279,7 @@ private void Release(int releaseCount) { Interlocked.Increment(ref _successfulLeasesCount); } - disposer.Add(nextPendingRequest); + disposer.CleanupAndAdd(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -317,7 +317,7 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - disposer.Add(next); + disposer.CleanupAndAdd(next); next.TrySetResult(FailedLease); } Debug.Assert(_queueCount == 0); @@ -443,7 +443,7 @@ public struct Disposer : IDisposable { private RequestRegistration? _next; - public void Add(RequestRegistration request) + public void CleanupAndAdd(RequestRegistration request) { request.Cleanup(); request._next = _next; diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs index 6f2d1a00429059..71c55d8527a6ae 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs @@ -174,7 +174,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C // Updating queue count is handled by the cancellation/cleanup code Interlocked.Increment(ref _failedLeasesCount); } - disposer.Add(oldestRequest); + disposer.CleanupAndAdd(oldestRequest); Debug.Assert(_queueCount >= 0); } while (_options.QueueLimit - _queueCount < permitCount); @@ -310,7 +310,7 @@ private void ReplenishInternal(long nowTicks) _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - disposer.Add(nextPendingRequest); + disposer.CleanupAndAdd(nextPendingRequest); } else if (_permitCount >= nextPendingRequest.Count) { @@ -333,7 +333,7 @@ private void ReplenishInternal(long nowTicks) { Interlocked.Increment(ref _successfulLeasesCount); } - disposer.Add(nextPendingRequest); + disposer.CleanupAndAdd(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -373,7 +373,7 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - disposer.Add(next); + disposer.CleanupAndAdd(next); next.TrySetResult(FailedLease); } Debug.Assert(_queueCount == 0); @@ -480,7 +480,7 @@ public struct Disposer : IDisposable { private RequestRegistration? _next; - public void Add(RequestRegistration request) + public void CleanupAndAdd(RequestRegistration request) { request.Cleanup(); request._next = _next; diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs index 5df75525bd43a9..6e6db5bf058db7 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs @@ -186,7 +186,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C // Updating queue count is handled by the cancellation/cleanup code Interlocked.Increment(ref _failedLeasesCount); } - disposer.Add(oldestRequest); + disposer.CleanupAndAdd(oldestRequest); Debug.Assert(_queueCount >= 0); } while (_options.QueueLimit - _queueCount < permitCount); @@ -320,7 +320,7 @@ private void ReplenishInternal(long nowTicks) _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - disposer.Add(nextPendingRequest); + disposer.CleanupAndAdd(nextPendingRequest); } // If we have enough permits after replenishing to serve the queued requests else if (_permitCount >= nextPendingRequest.Count) @@ -346,7 +346,7 @@ private void ReplenishInternal(long nowTicks) { Interlocked.Increment(ref _successfulLeasesCount); } - disposer.Add(nextPendingRequest); + disposer.CleanupAndAdd(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -386,7 +386,7 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - disposer.Add(next); + disposer.CleanupAndAdd(next); next.TrySetResult(FailedLease); } Debug.Assert(_queueCount == 0); @@ -493,7 +493,7 @@ public struct Disposer : IDisposable { private RequestRegistration? _next; - public void Add(RequestRegistration request) + public void CleanupAndAdd(RequestRegistration request) { request.Cleanup(); request._next = _next; diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs index 719eb8f611e961..3cea7f39988e9f 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs @@ -179,7 +179,7 @@ protected override ValueTask AcquireAsyncCore(int tokenCount, Ca // Updating queue count is handled by the cancellation/cleanup code Interlocked.Increment(ref _failedLeasesCount); } - disposer.Add(oldestRequest); + disposer.CleanupAndAdd(oldestRequest); Debug.Assert(_queueCount >= 0); } while (_options.QueueLimit - _queueCount < tokenCount); @@ -324,7 +324,7 @@ private void ReplenishInternal(long nowTicks) _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? queue.DequeueHead() : queue.DequeueTail(); - disposer.Add(nextPendingRequest); + disposer.CleanupAndAdd(nextPendingRequest); } else if (_tokenCount >= nextPendingRequest.Count) { @@ -347,7 +347,7 @@ private void ReplenishInternal(long nowTicks) { Interlocked.Increment(ref _successfulLeasesCount); } - disposer.Add(nextPendingRequest); + disposer.CleanupAndAdd(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -387,7 +387,7 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - disposer.Add(next); + disposer.CleanupAndAdd(next); next.TrySetResult(FailedLease); } Debug.Assert(_queueCount == 0); @@ -494,7 +494,7 @@ public struct Disposer : IDisposable { private RequestRegistration? _next; - public void Add(RequestRegistration request) + public void CleanupAndAdd(RequestRegistration request) { request.Cleanup(); request._next = _next;