From 55af2475e356c0a33e7907a6d2919411c7973b3b Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Tue, 11 May 2021 09:34:14 -0700 Subject: [PATCH 01/12] Improve the rate of thread injection for blocking due to sync-over-async Fixes https://github.com/dotnet/runtime/issues/52558 --- .../System/Threading/ThreadPool.CoreCLR.cs | 16 + .../System.Private.CoreLib.Shared.projitems | 3 +- .../Threading/PortableThreadPool.Blocking.cs | 386 ++++++++++++++++++ .../PortableThreadPool.GateThread.cs | 220 ++++++++-- .../PortableThreadPool.HillClimbing.cs | 25 +- .../PortableThreadPool.ThreadCounts.cs | 32 +- .../PortableThreadPool.WorkerThread.cs | 55 +-- .../System/Threading/PortableThreadPool.cs | 222 +++++----- .../src/System/Threading/Tasks/Task.cs | 20 +- .../System/Threading/ThreadPool.Portable.cs | 3 + .../System/Threading/ThreadPoolWorkQueue.cs | 22 +- 11 files changed, 803 insertions(+), 201 deletions(-) create mode 100644 src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs diff --git a/src/coreclr/System.Private.CoreLib/src/System/Threading/ThreadPool.CoreCLR.cs b/src/coreclr/System.Private.CoreLib/src/System/Threading/ThreadPool.CoreCLR.cs index 95b283e707c69e..dbf50bd81d080f 100644 --- a/src/coreclr/System.Private.CoreLib/src/System/Threading/ThreadPool.CoreCLR.cs +++ b/src/coreclr/System.Private.CoreLib/src/System/Threading/ThreadPool.CoreCLR.cs @@ -522,6 +522,22 @@ internal static void NotifyWorkItemProgress() [MethodImpl(MethodImplOptions.InternalCall)] private static extern void NotifyWorkItemProgressNative(); + internal static void NotifyThreadBlocked() + { + if (UsePortableThreadPool) + { + PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked(); + } + } + + internal static void NotifyThreadUnblocked() + { + if (UsePortableThreadPool) + { + PortableThreadPool.ThreadPoolInstance.NotifyThreadUnblocked(); + } + } + internal static object? GetOrCreateThreadLocalCompletionCountObject() => UsePortableThreadPool ? PortableThreadPool.ThreadPoolInstance.GetOrCreateThreadLocalCompletionCountObject() : null; diff --git a/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems b/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems index 22a9b4141336e2..8c0515110d95c1 100644 --- a/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems +++ b/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems @@ -2166,9 +2166,10 @@ - + + diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs new file mode 100644 index 00000000000000..7065353338e796 --- /dev/null +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs @@ -0,0 +1,386 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics; + +namespace System.Threading +{ + internal sealed partial class PortableThreadPool + { + public short MinThreadsGoal + { + get + { + _threadAdjustmentLock.VerifyIsLocked(); + return Math.Min(_separated.numThreadsGoal, TargetThreadsGoalForBlockingAdjustment); + } + } + + private short TargetThreadsGoalForBlockingAdjustment + { + get + { + _threadAdjustmentLock.VerifyIsLocked(); + + short targetThreadsGoal = _minThreads; + if (_numBlockedThreads <= 0) + { + return targetThreadsGoal; + } + + short maxThreads = MaxThreadsForBlockingAdjustment; + targetThreadsGoal += _numBlockedThreads; + if (targetThreadsGoal < _numBlockedThreads || targetThreadsGoal > maxThreads) + { + targetThreadsGoal = maxThreads; + } + return targetThreadsGoal; + } + } + + private short MaxThreadsForBlockingAdjustment + { + get + { + _threadAdjustmentLock.VerifyIsLocked(); + + short result = (short)(_minThreads + BlockingConfig.MaxThreadsToAddBeforeFallback); + return result < BlockingConfig.MaxThreadsToAddBeforeFallback || result > _maxThreads ? _maxThreads : result; + } + } + + public void NotifyThreadBlocked() + { + if (!BlockingConfig.IsCooperativeBlockingEnabled || !Thread.CurrentThread.IsThreadPoolThread) + { + return; + } + + bool wakeGateThread = false; + _threadAdjustmentLock.Acquire(); + try + { + _numBlockedThreads++; + Debug.Assert(_numBlockedThreads > 0); + + if (_pendingBlockingAdjustment != PendingBlockingAdjustment.WithDelayIfNecessary && + _separated.numThreadsGoal < TargetThreadsGoalForBlockingAdjustment) + { + if (_pendingBlockingAdjustment == PendingBlockingAdjustment.None) + { + wakeGateThread = true; + } + _pendingBlockingAdjustment = PendingBlockingAdjustment.WithDelayIfNecessary; + } + } + finally + { + _threadAdjustmentLock.Release(); + } + + if (wakeGateThread) + { + GateThread.Wake(this); + } + } + + public void NotifyThreadUnblocked() + { + if (!BlockingConfig.IsCooperativeBlockingEnabled || !Thread.CurrentThread.IsThreadPoolThread) + { + return; + } + + bool wakeGateThread = false; + _threadAdjustmentLock.Acquire(); + try + { + Debug.Assert(_numBlockedThreads > 0); + _numBlockedThreads--; + + if (_pendingBlockingAdjustment != PendingBlockingAdjustment.Immediately && + _numThreadsAddedDueToBlocking > 0 && + _separated.numThreadsGoal > TargetThreadsGoalForBlockingAdjustment) + { + wakeGateThread = true; + _pendingBlockingAdjustment = PendingBlockingAdjustment.Immediately; + } + } + finally + { + _threadAdjustmentLock.Release(); + } + + if (wakeGateThread) + { + GateThread.Wake(this); + } + } + + private uint PerformBlockingAdjustment(bool previousDelayElapsed) + { + uint nextDelayMs; + bool addWorker; + _threadAdjustmentLock.Acquire(); + try + { + nextDelayMs = PerformBlockingAdjustment(previousDelayElapsed, out addWorker); + } + finally + { + _threadAdjustmentLock.Release(); + } + + if (addWorker) + { + WorkerThread.MaybeAddWorkingWorker(this); + } + return nextDelayMs; + } + + private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWorker) + { + _threadAdjustmentLock.VerifyIsLocked(); + Debug.Assert(_pendingBlockingAdjustment != PendingBlockingAdjustment.None); + + _pendingBlockingAdjustment = PendingBlockingAdjustment.None; + addWorker = false; + + short targetThreadsGoal = TargetThreadsGoalForBlockingAdjustment; + short numThreadsGoal = _separated.numThreadsGoal; + if (numThreadsGoal == targetThreadsGoal) + { + return 0; + } + + if (numThreadsGoal > targetThreadsGoal) + { + // The goal is only decreased by how much it was increased in total due to blocking adjustments. This is to + // allow blocking adjustments to play well with starvation and hill climbing, either of which may increase the + // goal independently for other reasons, and blocking adjustments should not undo those changes. + if (_numThreadsAddedDueToBlocking <= 0) + { + return 0; + } + + short toSubtract = Math.Min((short)(numThreadsGoal - targetThreadsGoal), _numThreadsAddedDueToBlocking); + _numThreadsAddedDueToBlocking -= toSubtract; + _separated.numThreadsGoal = numThreadsGoal -= toSubtract; + HillClimbing.ThreadPoolHillClimber.ForceChange( + numThreadsGoal, + HillClimbing.StateOrTransition.CooperativeBlocking, + logTransition: false); // reduce noise + return 0; + } + + short maxThreads = MaxThreadsForBlockingAdjustment; + short configuredMaxThreadsWithoutDelay = (short)(_minThreads + BlockingConfig.ThreadsToAddWithoutDelay); + if (configuredMaxThreadsWithoutDelay < BlockingConfig.ThreadsToAddWithoutDelay) + { + configuredMaxThreadsWithoutDelay = maxThreads; + } + + do + { + // Calculate how many threads can be added without a delay. Threads that were already created but may be just + // waiting for work can be released for work without a delay, but creating a new thread may need a delay. + ThreadCounts counts = _separated.counts; + short maxThreadsGoalWithoutDelay = + Math.Max(configuredMaxThreadsWithoutDelay, Math.Min(counts.NumExistingThreads, maxThreads)); + short targetThreadsGoalWithoutDelay = Math.Min(targetThreadsGoal, maxThreadsGoalWithoutDelay); + short newNumThreadsGoal; + if (numThreadsGoal < targetThreadsGoalWithoutDelay) + { + newNumThreadsGoal = targetThreadsGoalWithoutDelay; + } + else if (previousDelayElapsed) + { + newNumThreadsGoal = (short)(numThreadsGoal + 1); + } + else + { + // Need to induce a delay before adding a thread + break; + } + + do + { + if (newNumThreadsGoal <= counts.NumExistingThreads) + { + break; + } + + // + // Threads would likely need to be created to compensate for blocking, so check memory usage and limits + // + + long memoryLimitBytes = _memoryLimitBytes; + if (memoryLimitBytes <= 0) + { + break; + } + + // Memory usage is updated after gen 2 GCs, and roughly represents how much physical memory was in use at + // the time of the last gen 2 GC. When new threads are also blocking, they may not have used their typical + // amount of stack space, and gen 2 GCs may not be happening to update the memory usage. Account for a bit + // of extra stack space usage in the future for each thread. + long memoryUsageBytes = + _memoryUsageBytes + + counts.NumExistingThreads * (long)WorkerThread.EstimatedAdditionalStackUsagePerThreadBytes; + + // The memory limit may already be less than the total amount of physical memory. We are only accounting for + // thread pool worker threads above, and after fallback starvation will have to continue creating threads + // slowly to prevent a deadlock, so calculate a threshold before falling back by giving the memory limit + // some additional buffer. + long memoryThresholdForFallbackBytes = memoryLimitBytes * 8 / 10; + if (memoryUsageBytes >= memoryThresholdForFallbackBytes) + { + return 0; + } + + // Determine how many threads can be added without exceeding the memory threshold + long achievableNumThreadsGoal = + counts.NumExistingThreads + + (memoryThresholdForFallbackBytes - memoryUsageBytes) / + WorkerThread.EstimatedAdditionalStackUsagePerThreadBytes; + newNumThreadsGoal = (short)Math.Min(newNumThreadsGoal, achievableNumThreadsGoal); + if (newNumThreadsGoal <= numThreadsGoal) + { + return 0; + } + } while (false); + + _numThreadsAddedDueToBlocking += (short)(newNumThreadsGoal - numThreadsGoal); + _separated.numThreadsGoal = newNumThreadsGoal; + HillClimbing.ThreadPoolHillClimber.ForceChange( + newNumThreadsGoal, + HillClimbing.StateOrTransition.CooperativeBlocking, + logTransition: newNumThreadsGoal == maxThreadsGoalWithoutDelay || previousDelayElapsed); // reduce noise + if (counts.NumProcessingWork >= numThreadsGoal && _separated.numRequestedWorkers > 0) + { + addWorker = true; + } + + numThreadsGoal = newNumThreadsGoal; + if (numThreadsGoal >= targetThreadsGoal) + { + return 0; + } + } while (false); + + // Calculate how much delay to induce before another thread is created. These operations don't overflow because of + // limits on max thread count and max delays. + _pendingBlockingAdjustment = PendingBlockingAdjustment.WithDelayIfNecessary; + int delayStepCount = 1 + (numThreadsGoal - configuredMaxThreadsWithoutDelay) / BlockingConfig.ThreadsPerDelayStep; + return Math.Min((uint)delayStepCount * BlockingConfig.DelayStepMs, BlockingConfig.MaxDelayUntilFallbackMs); + } + + private enum PendingBlockingAdjustment : byte + { + None, + Immediately, + WithDelayIfNecessary + } + + private static class BlockingConfig + { + public static readonly bool IsCooperativeBlockingEnabled = + AppContextConfigHelper.GetBooleanConfig("System.Threading.ThreadPool.Blocking.CooperativeBlocking", true); + + public static readonly short ThreadsToAddWithoutDelay; + public static readonly short MaxThreadsToAddBeforeFallback; + public static readonly short ThreadsPerDelayStep; + public static readonly uint DelayStepMs; + public static readonly uint MaxDelayUntilFallbackMs; + +#pragma warning disable CA1810 // remove the explicit static constructor + static BlockingConfig() + { + // Summary description of how blocking compensation works and how the config settings below are used: + // - After the thread count based on MinThreads is reached, up to ThreadsToAddWithoutDelay additional threads + // may be created without a delay + // - After that, before each additional thread is created, a delay is induced, starting with DelayStepMs + // - For every ThreadsPerDelayStep threads that are added with a delay, an additional DelayStepMs is added to + // the delay + // - Until MaxThreadsToAddBeforeFallback threads are added, the delay may not exceed MaxDelayUntilFallbackMs + // - After MaxThreadsToAddBeforeFallback threads are added, the system operates in fallback mode where a thread + // would be created if starvation is detected and typically with longer delays + // - Delays are only induced before creating threads. If threads are already available, they would be released + // without delay to compensate for cooperative blocking. + // - Physical memory usage and limits are also used and beyond a threshold, the system switches to fallback mode + + // After the thread count based on MinThreads is reached, this value (after it is multiplied by the processor + // count) specifies how many additional threads may be created without a delay + int blocking_threadsToAddWithoutDelay_procCountFactor = + AppContextConfigHelper.GetInt32Config( + "System.Threading.ThreadPool.Blocking.ThreadsToAddWithoutDelay_ProcCountFactor", + 1, + false); + + // After the thread count based on MinThreads is reached, this value (after it is multiplied by the processor + // count) specifies how many additional threads may be created, perhaps with delays, before fallback. In + // fallback mode a thread would be created if starvation is detected and typically with longer delays. + int blocking_maxThreadsToAddBeforeFallback_procCountFactor = + AppContextConfigHelper.GetInt32Config( + "System.Threading.ThreadPool.Blocking.MaxThreadsToAddBeforeFallback_ProcCountFactor", + 10, + false); + + // After the thread count based on ThreadsToAddWithoutDelay is reached, this value (after it is multiplied by + // the processor count) specifies after how many threads an additional DelayStepMs would be added to the delay + // before each new thread is created + int blocking_threadsPerDelayStep_procCountFactor = + AppContextConfigHelper.GetInt32Config( + "System.Threading.ThreadPool.Blocking.ThreadsPerDelayStep_ProcCountFactor", + 1, + false); + + // After the thread count based on ThreadsToAddWithoutDelay is reached, this value specifies how much additional + // delay to add per ThreadsPerDelayStep threads, which would be applied before each new thread is created + DelayStepMs = + (uint)AppContextConfigHelper.GetInt32Config( + "System.Threading.ThreadPool.Blocking.DelayStepMs", + 25, + false); + + // After the thread count based on ThreadsToAddWithoutDelay is reached, this value specifies the max delay to + // use before each new thread is created, until fallback. Fallback mode is used after the thread count based on + // MaxThreadsToAddBeforeFallback is reached, see that config value for more information. + MaxDelayUntilFallbackMs = + (uint)AppContextConfigHelper.GetInt32Config( + "System.Threading.ThreadPool.Blocking.MaxDelayUntilFallbackMs", + 250, + false); + + int processorCount = Environment.ProcessorCount; + MaxThreadsToAddBeforeFallback = (short)(processorCount * blocking_maxThreadsToAddBeforeFallback_procCountFactor); + if (MaxThreadsToAddBeforeFallback > MaxPossibleThreadCount || + MaxThreadsToAddBeforeFallback / processorCount != blocking_maxThreadsToAddBeforeFallback_procCountFactor) + { + MaxThreadsToAddBeforeFallback = MaxPossibleThreadCount; + } + + ThreadsToAddWithoutDelay = (short)(processorCount * blocking_threadsToAddWithoutDelay_procCountFactor); + if (ThreadsToAddWithoutDelay > MaxThreadsToAddBeforeFallback || + ThreadsToAddWithoutDelay / processorCount != blocking_threadsToAddWithoutDelay_procCountFactor) + { + ThreadsToAddWithoutDelay = MaxThreadsToAddBeforeFallback; + } + + blocking_threadsPerDelayStep_procCountFactor = Math.Max(1, blocking_threadsPerDelayStep_procCountFactor); + short maxThreadsPerDelayStep = (short)(MaxThreadsToAddBeforeFallback - ThreadsToAddWithoutDelay); + ThreadsPerDelayStep = + (short)(processorCount * blocking_threadsPerDelayStep_procCountFactor); + if (ThreadsPerDelayStep > maxThreadsPerDelayStep || + ThreadsPerDelayStep / processorCount != blocking_threadsPerDelayStep_procCountFactor) + { + ThreadsPerDelayStep = maxThreadsPerDelayStep; + } + + MaxDelayUntilFallbackMs = Math.Max(1, Math.Min(MaxDelayUntilFallbackMs, GateThread.GateActivitiesPeriodMs)); + DelayStepMs = Math.Max(1, Math.Min(DelayStepMs, MaxDelayUntilFallbackMs)); + } +#pragma warning restore CA1810 + } + } +} diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs index eba5c66d7e0a3c..fefe16da418eb3 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs @@ -12,14 +12,14 @@ internal sealed partial class PortableThreadPool { private static class GateThread { - private const int GateThreadDelayMs = 500; - private const int DequeueDelayThresholdMs = GateThreadDelayMs * 2; + public const uint GateActivitiesPeriodMs = 500; + private const uint DequeueDelayThresholdMs = GateActivitiesPeriodMs * 2; private const int GateThreadRunningMask = 0x4; - - private static readonly AutoResetEvent s_runGateThreadEvent = new AutoResetEvent(initialState: true); - private const int MaxRuns = 2; + private static readonly AutoResetEvent RunGateThreadEvent = new AutoResetEvent(initialState: true); + private static readonly AutoResetEvent DelayEvent = new AutoResetEvent(initialState: false); + private static void GateThreadStart() { bool disableStarvationDetection = @@ -33,16 +33,68 @@ private static void GateThreadStart() _ = cpuUtilizationReader.CurrentUtilization; PortableThreadPool threadPoolInstance = ThreadPoolInstance; - LowLevelLock hillClimbingThreadAdjustmentLock = threadPoolInstance._hillClimbingThreadAdjustmentLock; + LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock; + DelayHelper delayHelper = default; + + if (BlockingConfig.IsCooperativeBlockingEnabled) + { + // Initialize memory usage and limits, and register to update them on gen 2 GCs + threadPoolInstance.OnGen2GCCallback(); + Gen2GcCallback.Register(threadPoolInstance.OnGen2GCCallback); + } while (true) { - s_runGateThreadEvent.WaitOne(); + RunGateThreadEvent.WaitOne(); + int currentTimeMs = Environment.TickCount; + delayHelper.SetGateActivitiesTime(currentTimeMs); - bool needGateThreadForRuntime; - do + while (true) { - Thread.Sleep(GateThreadDelayMs); + bool wasSignaledToWake = DelayEvent.WaitOne((int)delayHelper.GetNextDelay(currentTimeMs)); + currentTimeMs = Environment.TickCount; + + // Thread count adjustment for cooperative blocking + do + { + PendingBlockingAdjustment pendingBlockingAdjustment = threadPoolInstance._pendingBlockingAdjustment; + if (pendingBlockingAdjustment == PendingBlockingAdjustment.None) + { + delayHelper.ClearBlockingAdjustmentDelay(); + break; + } + + bool previousDelayElapsed = false; + if (delayHelper.HasBlockingAdjustmentDelay) + { + previousDelayElapsed = + delayHelper.HasBlockingAdjustmentDelayElapsed(currentTimeMs, wasSignaledToWake); + if (pendingBlockingAdjustment == PendingBlockingAdjustment.WithDelayIfNecessary && + !previousDelayElapsed) + { + break; + } + } + + uint nextDelayMs = threadPoolInstance.PerformBlockingAdjustment(previousDelayElapsed); + if (nextDelayMs <= 0) + { + delayHelper.ClearBlockingAdjustmentDelay(); + } + else + { + delayHelper.SetBlockingAdjustmentTimeAndDelay(currentTimeMs, nextDelayMs); + } + } while (false); + + // + // Periodic gate activities + // + + if (!delayHelper.ShouldPerformGateActivities(currentTimeMs, wasSignaledToWake)) + { + continue; + } if (ThreadPool.EnableWorkerTracking && NativeRuntimeEventSource.Log.IsEnabled()) { @@ -53,17 +105,17 @@ private static void GateThreadStart() int cpuUtilization = cpuUtilizationReader.CurrentUtilization; threadPoolInstance._cpuUtilization = cpuUtilization; - needGateThreadForRuntime = ThreadPool.PerformRuntimeSpecificGateActivities(cpuUtilization); + bool needGateThreadForRuntime = ThreadPool.PerformRuntimeSpecificGateActivities(cpuUtilization); if (!disableStarvationDetection && + threadPoolInstance._pendingBlockingAdjustment == PendingBlockingAdjustment.None && threadPoolInstance._separated.numRequestedWorkers > 0 && SufficientDelaySinceLastDequeue(threadPoolInstance)) { + bool addWorker = false; + threadAdjustmentLock.Acquire(); try { - hillClimbingThreadAdjustmentLock.Acquire(); - ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead(); - // Don't add a thread if we're at max or if we are already in the process of adding threads. // This logic is slightly different from the native implementation in CoreCLR because there are // no retired threads. In the native implementation, when hill climbing reduces the thread count @@ -73,61 +125,67 @@ private static void GateThreadStart() // stopped from working by hill climbing, so here the number of threads processing work, instead // of the number of existing threads, is compared with the goal. There may be alternative // solutions, for now this is only to maintain consistency in behavior. - while ( - counts.NumExistingThreads < threadPoolInstance._maxThreads && - counts.NumProcessingWork >= counts.NumThreadsGoal) + ThreadCounts counts = threadPoolInstance._separated.counts; + if (counts.NumProcessingWork < threadPoolInstance._maxThreads && + counts.NumProcessingWork >= threadPoolInstance._separated.numThreadsGoal) { if (debuggerBreakOnWorkStarvation) { Debugger.Break(); } - ThreadCounts newCounts = counts; short newNumThreadsGoal = (short)(counts.NumProcessingWork + 1); - newCounts.NumThreadsGoal = newNumThreadsGoal; - - ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); - if (oldCounts == counts) - { - HillClimbing.ThreadPoolHillClimber.ForceChange(newNumThreadsGoal, HillClimbing.StateOrTransition.Starvation); - WorkerThread.MaybeAddWorkingWorker(threadPoolInstance); - break; - } - - counts = oldCounts; + threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal; + HillClimbing.ThreadPoolHillClimber.ForceChange( + newNumThreadsGoal, + HillClimbing.StateOrTransition.Starvation); + addWorker = true; } } finally { - hillClimbingThreadAdjustmentLock.Release(); + threadAdjustmentLock.Release(); + } + + if (addWorker) + { + WorkerThread.MaybeAddWorkingWorker(threadPoolInstance); } } - } while ( - needGateThreadForRuntime || - threadPoolInstance._separated.numRequestedWorkers > 0 || - Interlocked.Decrement(ref threadPoolInstance._separated.gateThreadRunningState) > GetRunningStateForNumRuns(0)); + + if (!needGateThreadForRuntime && + threadPoolInstance._separated.numRequestedWorkers <= 0 && + threadPoolInstance._pendingBlockingAdjustment == PendingBlockingAdjustment.None && + Interlocked.Decrement(ref threadPoolInstance._separated.gateThreadRunningState) <= GetRunningStateForNumRuns(0)) + { + break; + } + } } } + public static void Wake(PortableThreadPool threadPoolInstance) + { + DelayEvent.Set(); + EnsureRunning(threadPoolInstance); + } + // called by logic to spawn new worker threads, return true if it's been too long // since the last dequeue operation - takes number of worker threads into account // in deciding "too long" private static bool SufficientDelaySinceLastDequeue(PortableThreadPool threadPoolInstance) { - int delay = Environment.TickCount - Volatile.Read(ref threadPoolInstance._separated.lastDequeueTime); - - int minimumDelay; - + uint delay = (uint)(Environment.TickCount - threadPoolInstance._separated.lastDequeueTime); + uint minimumDelay; if (threadPoolInstance._cpuUtilization < CpuUtilizationLow) { - minimumDelay = GateThreadDelayMs; + minimumDelay = GateActivitiesPeriodMs; } else { - ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead(); - int numThreads = counts.NumThreadsGoal; - minimumDelay = numThreads * DequeueDelayThresholdMs; + minimumDelay = (uint)threadPoolInstance._separated.numThreadsGoal * DequeueDelayThresholdMs; } + return delay > minimumDelay; } @@ -148,7 +206,7 @@ internal static void EnsureRunningSlow(PortableThreadPool threadPoolInstance) int numRunsMask = Interlocked.Exchange(ref threadPoolInstance._separated.gateThreadRunningState, GetRunningStateForNumRuns(MaxRuns)); if (numRunsMask == GetRunningStateForNumRuns(0)) { - s_runGateThreadEvent.Set(); + RunGateThreadEvent.Set(); } else if ((numRunsMask & GateThreadRunningMask) == 0) { @@ -188,6 +246,82 @@ private static void CreateGateThread(PortableThreadPool threadPoolInstance) } } } + + private struct DelayHelper + { + private int _previousGateActivitiesTimeMs; + private int _previousBlockingAdjustmentDelayStartTimeMs; + private uint _previousBlockingAdjustmentDelayMs; + private bool _runGateActivitiesAfterNextDelay; + private bool _adjustForBlockingAfterNextDelay; + + public void SetGateActivitiesTime(int currentTimeMs) + { + _previousGateActivitiesTimeMs = currentTimeMs; + } + + public void SetBlockingAdjustmentTimeAndDelay(int currentTimeMs, uint delayMs) + { + _previousBlockingAdjustmentDelayStartTimeMs = currentTimeMs; + _previousBlockingAdjustmentDelayMs = delayMs; + } + + public void ClearBlockingAdjustmentDelay() => _previousBlockingAdjustmentDelayMs = 0; + public bool HasBlockingAdjustmentDelay => _previousBlockingAdjustmentDelayMs != 0; + + public uint GetNextDelay(int currentTimeMs) + { + uint elapsedMsSincePreviousGateActivities = (uint)(currentTimeMs - _previousGateActivitiesTimeMs); + uint nextDelayForGateActivities = + elapsedMsSincePreviousGateActivities < GateActivitiesPeriodMs + ? GateActivitiesPeriodMs - elapsedMsSincePreviousGateActivities + : 1; + if (_previousBlockingAdjustmentDelayMs == 0) + { + _runGateActivitiesAfterNextDelay = true; + _adjustForBlockingAfterNextDelay = false; + return nextDelayForGateActivities; + } + + uint elapsedMsSincePreviousBlockingAdjustmentDelay = + (uint)(currentTimeMs - _previousBlockingAdjustmentDelayStartTimeMs); + uint nextDelayForBlockingAdjustment = + elapsedMsSincePreviousBlockingAdjustmentDelay < _previousBlockingAdjustmentDelayMs + ? _previousBlockingAdjustmentDelayMs - elapsedMsSincePreviousBlockingAdjustmentDelay + : 1; + uint nextDelay = Math.Min(nextDelayForGateActivities, nextDelayForBlockingAdjustment); + _runGateActivitiesAfterNextDelay = nextDelay == nextDelayForGateActivities; + _adjustForBlockingAfterNextDelay = nextDelay == nextDelayForBlockingAdjustment; + Debug.Assert(nextDelay <= GateActivitiesPeriodMs); + return nextDelay; + } + + public bool ShouldPerformGateActivities(int currentTimeMs, bool wasSignaledToWake) + { + bool result = + (!wasSignaledToWake && _runGateActivitiesAfterNextDelay) || + (uint)(currentTimeMs - _previousGateActivitiesTimeMs) >= GateActivitiesPeriodMs; + if (result) + { + SetGateActivitiesTime(currentTimeMs); + } + return result; + } + + public bool HasBlockingAdjustmentDelayElapsed(int currentTimeMs, bool wasSignaledToWake) + { + Debug.Assert(HasBlockingAdjustmentDelay); + + if (!wasSignaledToWake && _adjustForBlockingAfterNextDelay) + { + return true; + } + + uint elapsedMsSincePreviousBlockingAdjustmentDelay = + (uint)(currentTimeMs - _previousBlockingAdjustmentDelayStartTimeMs); + return elapsedMsSincePreviousBlockingAdjustmentDelay >= _previousBlockingAdjustmentDelayMs; + } + } } internal static void EnsureGateThreadRunning() => GateThread.EnsureRunning(ThreadPoolInstance); diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.HillClimbing.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.HillClimbing.cs index aead56b38acabf..88190c6e480aa9 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.HillClimbing.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.HillClimbing.cs @@ -33,6 +33,7 @@ public enum StateOrTransition Stabilizing, Starvation, ThreadTimedOut, + CooperativeBlocking, } // SOS's ThreadPool command depends on the names of all fields @@ -321,10 +322,12 @@ public HillClimbing() newThreadWaveMagnitude = Math.Max(newThreadWaveMagnitude, 1); // - // Make sure our control setting is within the ThreadPool's limits + // Make sure our control setting is within the ThreadPool's limits. When some threads are blocked due to + // cooperative blocking, ensure that hill climbing does not decrease the thread count below the expected + // minimum. // int maxThreads = threadPoolInstance._maxThreads; - int minThreads = threadPoolInstance._minThreads; + int minThreads = threadPoolInstance.MinThreadsGoal; _currentControlSetting = Math.Min(maxThreads - newThreadWaveMagnitude, _currentControlSetting); _currentControlSetting = Math.Max(minThreads, _currentControlSetting); @@ -374,10 +377,20 @@ public HillClimbing() return (newThreadCount, newSampleInterval); } - private void ChangeThreadCount(int newThreadCount, StateOrTransition state) + private void ChangeThreadCount(int newThreadCount, StateOrTransition state, bool logTransition = true) { _lastThreadCount = newThreadCount; - _currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1); + + if (state != StateOrTransition.CooperativeBlocking) // this can be noisy + { + _currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1); + } + + if (!logTransition) + { + return; + } + double throughput = _secondsElapsedSinceLastChange > 0 ? _completionsSinceLastChange / _secondsElapsedSinceLastChange : 0; LogTransition(newThreadCount, throughput, state); _secondsElapsedSinceLastChange = 0; @@ -414,12 +427,12 @@ private void LogTransition(int newThreadCount, double throughput, StateOrTransit } } - public void ForceChange(int newThreadCount, StateOrTransition state) + public void ForceChange(int newThreadCount, StateOrTransition state, bool logTransition = true) { if (_lastThreadCount != newThreadCount) { _currentControlSetting += newThreadCount - _lastThreadCount; - ChangeThreadCount(newThreadCount, state); + ChangeThreadCount(newThreadCount, state, logTransition); } } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs index 1c0c121847387c..ae2efb6c7761d4 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs @@ -16,15 +16,14 @@ private struct ThreadCounts // SOS's ThreadPool command depends on this layout private const byte NumProcessingWorkShift = 0; private const byte NumExistingThreadsShift = 16; - private const byte NumThreadsGoalShift = 32; - private ulong _data; // SOS's ThreadPool command depends on this name + private uint _data; // SOS's ThreadPool command depends on this name - private ThreadCounts(ulong data) => _data = data; + private ThreadCounts(uint data) => _data = data; private short GetInt16Value(byte shift) => (short)(_data >> shift); private void SetInt16Value(short value, byte shift) => - _data = (_data & ~((ulong)ushort.MaxValue << shift)) | ((ulong)(ushort)value << shift); + _data = (_data & ~((uint)ushort.MaxValue << shift)) | ((uint)(ushort)value << shift); /// /// Number of threads processing work items. @@ -44,7 +43,13 @@ public void SubtractNumProcessingWork(short value) Debug.Assert(value >= 0); Debug.Assert(value <= NumProcessingWork); - _data -= (ulong)(ushort)value << NumProcessingWorkShift; + _data -= (uint)(ushort)value << NumProcessingWorkShift; + } + + public void InterlockedDecrementNumProcessingWork() + { + Debug.Assert(NumProcessingWorkShift == 0); + Interlocked.Decrement(ref _data); } /// @@ -65,20 +70,7 @@ public void SubtractNumExistingThreads(short value) Debug.Assert(value >= 0); Debug.Assert(value <= NumExistingThreads); - _data -= (ulong)(ushort)value << NumExistingThreadsShift; - } - - /// - /// Max possible thread pool threads we want to have. - /// - public short NumThreadsGoal - { - get => GetInt16Value(NumThreadsGoalShift); - set - { - Debug.Assert(value > 0); - SetInt16Value(value, NumThreadsGoalShift); - } + _data -= (uint)(ushort)value << NumExistingThreadsShift; } public ThreadCounts VolatileRead() => new ThreadCounts(Volatile.Read(ref _data)); @@ -90,7 +82,7 @@ public ThreadCounts InterlockedCompareExchange(ThreadCounts newCounts, ThreadCou public static bool operator !=(ThreadCounts lhs, ThreadCounts rhs) => lhs._data != rhs._data; public override bool Equals([NotNullWhen(true)] object? obj) => obj is ThreadCounts other && _data == other._data; - public override int GetHashCode() => (int)_data + (int)(_data >> 32); + public override int GetHashCode() => (int)_data; } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs index 917a1113f59630..8b1a44946d10f5 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs @@ -12,6 +12,11 @@ internal sealed partial class PortableThreadPool /// private static class WorkerThread { + // This value represents an assumption of how much uncommited stack space a worker thread may use in the future. + // Used in calculations to estimate when to throttle the rate of thread injection to reduce the possibility of + // preexisting threads from running out of memory when using new stack space in low-memory situations. + public const int EstimatedAdditionalStackUsagePerThreadBytes = 64 << 10; // 64 KB + /// /// Semaphore for controlling how many threads are currently working. /// @@ -43,7 +48,7 @@ private static void WorkerThreadStart() (uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads); } - LowLevelLock hillClimbingThreadAdjustmentLock = threadPoolInstance._hillClimbingThreadAdjustmentLock; + LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock; LowLevelLifoSemaphore semaphore = s_semaphore; while (true) @@ -54,7 +59,7 @@ private static void WorkerThreadStart() bool alreadyRemovedWorkingWorker = false; while (TakeActiveRequest(threadPoolInstance)) { - Volatile.Write(ref threadPoolInstance._separated.lastDequeueTime, Environment.TickCount); + threadPoolInstance._separated.lastDequeueTime = Environment.TickCount; if (!ThreadPoolWorkQueue.Dispatch()) { // ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have @@ -96,14 +101,14 @@ private static void WorkerThreadStart() } } - hillClimbingThreadAdjustmentLock.Acquire(); + threadAdjustmentLock.Acquire(); try { // At this point, the thread's wait timed out. We are shutting down this thread. // We are going to decrement the number of exisiting threads to no longer include this one // and then change the max number of threads in the thread pool to reflect that we don't need as many // as we had. Finally, we are going to tell hill climbing that we changed the max number of threads. - ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead(); + ThreadCounts counts = threadPoolInstance._separated.counts; while (true) { // Since this thread is currently registered as an existing thread, if more work comes in meanwhile, @@ -119,13 +124,21 @@ private static void WorkerThreadStart() ThreadCounts newCounts = counts; newCounts.SubtractNumExistingThreads(1); short newNumExistingThreads = (short)(numExistingThreads - 1); - short newNumThreadsGoal = Math.Max(threadPoolInstance._minThreads, Math.Min(newNumExistingThreads, newCounts.NumThreadsGoal)); - newCounts.NumThreadsGoal = newNumThreadsGoal; ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); if (oldCounts == counts) { - HillClimbing.ThreadPoolHillClimber.ForceChange(newNumThreadsGoal, HillClimbing.StateOrTransition.ThreadTimedOut); + short newNumThreadsGoal = + Math.Max( + threadPoolInstance.MinThreadsGoal, + Math.Min(newNumExistingThreads, threadPoolInstance._separated.numThreadsGoal)); + if (threadPoolInstance._separated.numThreadsGoal != newNumThreadsGoal) + { + threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal; + HillClimbing.ThreadPoolHillClimber.ForceChange( + newNumThreadsGoal, + HillClimbing.StateOrTransition.ThreadTimedOut); + } if (NativeRuntimeEventSource.Log.IsEnabled()) { @@ -139,7 +152,7 @@ private static void WorkerThreadStart() } finally { - hillClimbingThreadAdjustmentLock.Release(); + threadAdjustmentLock.Release(); } } } @@ -149,19 +162,7 @@ private static void WorkerThreadStart() /// private static void RemoveWorkingWorker(PortableThreadPool threadPoolInstance) { - ThreadCounts currentCounts = threadPoolInstance._separated.counts.VolatileRead(); - while (true) - { - ThreadCounts newCounts = currentCounts; - newCounts.SubtractNumProcessingWork(1); - ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, currentCounts); - - if (oldCounts == currentCounts) - { - break; - } - currentCounts = oldCounts; - } + threadPoolInstance._separated.counts.InterlockedDecrementNumProcessingWork(); // It's possible that we decided we had thread requests just before a request came in, // but reduced the worker count *after* the request came in. In this case, we might @@ -175,12 +176,12 @@ private static void RemoveWorkingWorker(PortableThreadPool threadPoolInstance) internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance) { - ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead(); + ThreadCounts counts = threadPoolInstance._separated.counts; short numExistingThreads, numProcessingWork, newNumExistingThreads, newNumProcessingWork; while (true) { numProcessingWork = counts.NumProcessingWork; - if (numProcessingWork >= counts.NumThreadsGoal) + if (numProcessingWork >= threadPoolInstance._separated.numThreadsGoal) { return; } @@ -219,7 +220,7 @@ internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance continue; } - counts = threadPoolInstance._separated.counts.VolatileRead(); + counts = threadPoolInstance._separated.counts; while (true) { ThreadCounts newCounts = counts; @@ -245,17 +246,17 @@ internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance /// Whether or not this thread should stop processing work even if there is still work in the queue. internal static bool ShouldStopProcessingWorkNow(PortableThreadPool threadPoolInstance) { - ThreadCounts counts = threadPoolInstance._separated.counts.VolatileRead(); + ThreadCounts counts = threadPoolInstance._separated.counts; while (true) { - // When there are more threads processing work than the thread count goal, hill climbing must have decided + // When there are more threads processing work than the thread count goal, it may have been decided // to decrease the number of threads. Stop processing if the counts can be updated. We may have more // threads existing than the thread count goal and that is ok, the cold ones will eventually time out if // the thread count goal is not increased again. This logic is a bit different from the original CoreCLR // code from which this implementation was ported, which turns a processing thread into a retired thread // and checks for pending requests like RemoveWorkingWorker. In this implementation there are // no retired threads, so only the count of threads processing work is considered. - if (counts.NumProcessingWork <= counts.NumThreadsGoal) + if (counts.NumProcessingWork <= threadPoolInstance._separated.numThreadsGoal) { return false; } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs index b18e963f360662..d79c0684557da8 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs @@ -28,8 +28,10 @@ internal sealed partial class PortableThreadPool private const int CpuUtilizationHigh = 95; private const int CpuUtilizationLow = 80; - private static readonly short s_forcedMinWorkerThreads = AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MinThreads", 0, false); - private static readonly short s_forcedMaxWorkerThreads = AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MaxThreads", 0, false); + private static readonly short ForcedMinWorkerThreads = + AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MinThreads", 0, false); + private static readonly short ForcedMaxWorkerThreads = + AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MaxThreads", 0, false); [ThreadStatic] private static object? t_completionCountObject; @@ -43,43 +45,55 @@ internal sealed partial class PortableThreadPool private int _cpuUtilization; // SOS's ThreadPool command depends on this name private short _minThreads; private short _maxThreads; - private readonly LowLevelLock _maxMinThreadLock = new LowLevelLock(); [StructLayout(LayoutKind.Explicit, Size = Internal.PaddingHelpers.CACHE_LINE_SIZE * 6)] private struct CacheLineSeparated { [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1)] public ThreadCounts counts; // SOS's ThreadPool command depends on this name + [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1 + sizeof(uint))] + public short numThreadsGoal; + [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 2)] public int lastDequeueTime; + [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3)] public int priorCompletionCount; [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3 + sizeof(int))] public int priorCompletedWorkRequestsTime; [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3 + sizeof(int) * 2)] public int nextCompletedWorkRequestsTime; + [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 4)] public volatile int numRequestedWorkers; - [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 5)] + [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 4 + sizeof(int))] public int gateThreadRunningState; } - private CacheLineSeparated _separated; // SOS's ThreadPool command depends on this name private long _currentSampleStartTime; private readonly ThreadInt64PersistentCounter _completionCounter = new ThreadInt64PersistentCounter(); private int _threadAdjustmentIntervalMs; - private readonly LowLevelLock _hillClimbingThreadAdjustmentLock = new LowLevelLock(); + private short _numBlockedThreads; + private short _numThreadsAddedDueToBlocking; + private PendingBlockingAdjustment _pendingBlockingAdjustment; + + private long _memoryUsageBytes; + private long _memoryLimitBytes; + + private readonly LowLevelLock _threadAdjustmentLock = new LowLevelLock(); + + private CacheLineSeparated _separated; // SOS's ThreadPool command depends on this name private PortableThreadPool() { - _minThreads = s_forcedMinWorkerThreads > 0 ? s_forcedMinWorkerThreads : (short)Environment.ProcessorCount; + _minThreads = ForcedMinWorkerThreads > 0 ? ForcedMinWorkerThreads : (short)Environment.ProcessorCount; if (_minThreads > MaxPossibleThreadCount) { _minThreads = MaxPossibleThreadCount; } - _maxThreads = s_forcedMaxWorkerThreads > 0 ? s_forcedMaxWorkerThreads : DefaultMaxWorkerThreadCount; + _maxThreads = ForcedMaxWorkerThreads > 0 ? ForcedMaxWorkerThreads : DefaultMaxWorkerThreadCount; if (_maxThreads > MaxPossibleThreadCount) { _maxThreads = MaxPossibleThreadCount; @@ -89,13 +103,7 @@ private PortableThreadPool() _maxThreads = _minThreads; } - _separated = new CacheLineSeparated - { - counts = new ThreadCounts - { - NumThreadsGoal = _minThreads - } - }; + _separated.numThreadsGoal = _minThreads; } public bool SetMinThreads(int workerThreads, int ioCompletionThreads) @@ -105,7 +113,10 @@ public bool SetMinThreads(int workerThreads, int ioCompletionThreads) return false; } - _maxMinThreadLock.Acquire(); + bool addWorker = false; + bool wakeGateThread = false; + + _threadAdjustmentLock.Acquire(); try { if (workerThreads > _maxThreads || !ThreadPool.CanSetMinIOCompletionThreads(ioCompletionThreads)) @@ -115,39 +126,49 @@ public bool SetMinThreads(int workerThreads, int ioCompletionThreads) ThreadPool.SetMinIOCompletionThreads(ioCompletionThreads); - if (s_forcedMinWorkerThreads != 0) + if (ForcedMinWorkerThreads != 0) { return true; } short newMinThreads = (short)Math.Max(1, Math.Min(workerThreads, MaxPossibleThreadCount)); _minThreads = newMinThreads; - - ThreadCounts counts = _separated.counts.VolatileRead(); - while (counts.NumThreadsGoal < newMinThreads) + if (_numBlockedThreads > 0) { - ThreadCounts newCounts = counts; - newCounts.NumThreadsGoal = newMinThreads; - - ThreadCounts oldCounts = _separated.counts.InterlockedCompareExchange(newCounts, counts); - if (oldCounts == counts) + // Blocking adjustment will adjust the goal according to its heuristics + if (_pendingBlockingAdjustment != PendingBlockingAdjustment.Immediately) { + _pendingBlockingAdjustment = PendingBlockingAdjustment.Immediately; + wakeGateThread = true; + } + } + else + { + Debug.Assert(newMinThreads == MinThreadsGoal); + if (_separated.numThreadsGoal < newMinThreads) + { + _separated.numThreadsGoal = newMinThreads; if (_separated.numRequestedWorkers > 0) { - WorkerThread.MaybeAddWorkingWorker(this); + addWorker = true; } - break; } - - counts = oldCounts; } - - return true; } finally { - _maxMinThreadLock.Release(); + _threadAdjustmentLock.Release(); + } + + if (addWorker) + { + WorkerThread.MaybeAddWorkingWorker(this); + } + else if (wakeGateThread) + { + GateThread.Wake(this); } + return true; } public int GetMinThreads() => Volatile.Read(ref _minThreads); @@ -159,7 +180,7 @@ public bool SetMaxThreads(int workerThreads, int ioCompletionThreads) return false; } - _maxMinThreadLock.Acquire(); + _threadAdjustmentLock.Acquire(); try { if (workerThreads < _minThreads || !ThreadPool.CanSetMaxIOCompletionThreads(ioCompletionThreads)) @@ -169,34 +190,22 @@ public bool SetMaxThreads(int workerThreads, int ioCompletionThreads) ThreadPool.SetMaxIOCompletionThreads(ioCompletionThreads); - if (s_forcedMaxWorkerThreads != 0) + if (ForcedMaxWorkerThreads != 0) { return true; } short newMaxThreads = (short)Math.Min(workerThreads, MaxPossibleThreadCount); _maxThreads = newMaxThreads; - - ThreadCounts counts = _separated.counts.VolatileRead(); - while (counts.NumThreadsGoal > newMaxThreads) + if (_separated.numThreadsGoal > newMaxThreads) { - ThreadCounts newCounts = counts; - newCounts.NumThreadsGoal = newMaxThreads; - - ThreadCounts oldCounts = _separated.counts.InterlockedCompareExchange(newCounts, counts); - if (oldCounts == counts) - { - break; - } - - counts = oldCounts; + _separated.numThreadsGoal = newMaxThreads; } - return true; } finally { - _maxMinThreadLock.Release(); + _threadAdjustmentLock.Release(); } } @@ -232,7 +241,7 @@ private object CreateThreadLocalCompletionCountObject() private void NotifyWorkItemProgress(object threadLocalCompletionCountObject, int currentTimeMs) { ThreadInt64PersistentCounter.Increment(threadLocalCompletionCountObject); - Volatile.Write(ref _separated.lastDequeueTime, Environment.TickCount); + _separated.lastDequeueTime = currentTimeMs; if (ShouldAdjustMaxWorkersActive(currentTimeMs)) { @@ -257,15 +266,23 @@ internal bool NotifyWorkItemComplete(object? threadLocalCompletionCountObject, i // private void AdjustMaxWorkersActive() { - LowLevelLock hillClimbingThreadAdjustmentLock = _hillClimbingThreadAdjustmentLock; - if (!hillClimbingThreadAdjustmentLock.TryAcquire()) + LowLevelLock threadAdjustmentLock = _threadAdjustmentLock; + if (!threadAdjustmentLock.TryAcquire()) { // The lock is held by someone else, they will take care of this for us return; } + bool addWorker = false; try { + // Skip hill climbing when there is a pending blocking adjustment. Hill climbing may otherwise bypass the + // blocking adjustment heuristics and increase the thread count too quickly. + if (_pendingBlockingAdjustment != PendingBlockingAdjustment.None) + { + return; + } + long startTime = _currentSampleStartTime; long endTime = Stopwatch.GetTimestamp(); long freq = Stopwatch.Frequency; @@ -278,39 +295,24 @@ private void AdjustMaxWorkersActive() int totalNumCompletions = (int)_completionCounter.Count; int numCompletions = totalNumCompletions - _separated.priorCompletionCount; - ThreadCounts currentCounts = _separated.counts.VolatileRead(); - int newMax; - (newMax, _threadAdjustmentIntervalMs) = HillClimbing.ThreadPoolHillClimber.Update(currentCounts.NumThreadsGoal, elapsedSeconds, numCompletions); - - while (newMax != currentCounts.NumThreadsGoal) + int newNumThreadsGoal; + (newNumThreadsGoal, _threadAdjustmentIntervalMs) = + HillClimbing.ThreadPoolHillClimber.Update(_separated.numThreadsGoal, elapsedSeconds, numCompletions); + short oldNumThreadsGoal = _separated.numThreadsGoal; + if (oldNumThreadsGoal != (short)newNumThreadsGoal) { - ThreadCounts newCounts = currentCounts; - newCounts.NumThreadsGoal = (short)newMax; - - ThreadCounts oldCounts = _separated.counts.InterlockedCompareExchange(newCounts, currentCounts); - if (oldCounts == currentCounts) - { - // - // If we're increasing the max, inject a thread. If that thread finds work, it will inject - // another thread, etc., until nobody finds work or we reach the new maximum. - // - // If we're reducing the max, whichever threads notice this first will sleep and timeout themselves. - // - if (newMax > oldCounts.NumThreadsGoal) - { - WorkerThread.MaybeAddWorkingWorker(this); - } - break; - } - - if (oldCounts.NumThreadsGoal > currentCounts.NumThreadsGoal && oldCounts.NumThreadsGoal >= newMax) + _separated.numThreadsGoal = (short)newNumThreadsGoal; + + // + // If we're increasing the goal, inject a thread. If that thread finds work, it will inject + // another thread, etc., until nobody finds work or we reach the new goal. + // + // If we're reducing the goal, whichever threads notice this first will sleep and timeout themselves. + // + if (newNumThreadsGoal > oldNumThreadsGoal) { - // someone (probably the gate thread) increased the thread count more than - // we are about to do. Don't interfere. - break; + addWorker = true; } - - currentCounts = oldCounts; } _separated.priorCompletionCount = totalNumCompletions; @@ -321,32 +323,49 @@ private void AdjustMaxWorkersActive() } finally { - hillClimbingThreadAdjustmentLock.Release(); + threadAdjustmentLock.Release(); + } + + if (addWorker) + { + WorkerThread.MaybeAddWorkingWorker(this); } } private bool ShouldAdjustMaxWorkersActive(int currentTimeMs) { + if (HillClimbing.IsDisabled) + { + return false; + } + // We need to subtract by prior time because Environment.TickCount can wrap around, making a comparison of absolute // times unreliable. Intervals are unsigned to avoid wrapping around on the subtract after enough time elapses, and // this also prevents the initial elapsed interval from being negative due to the prior and next times being // initialized to zero. int priorTime = Volatile.Read(ref _separated.priorCompletedWorkRequestsTime); - uint requiredInterval = (uint)(_separated.nextCompletedWorkRequestsTime - priorTime); - uint elapsedInterval = (uint)(currentTimeMs - priorTime); - if (elapsedInterval >= requiredInterval) + int requiredInterval = _separated.nextCompletedWorkRequestsTime - priorTime; + int elapsedInterval = currentTimeMs - priorTime; + if (elapsedInterval < requiredInterval) + { + return false; + } + + // Avoid trying to adjust the thread count goal if there are already more threads than the thread count goal. + // In that situation, hill climbing must have previously decided to decrease the thread count goal, so let's + // wait until the system responds to that change before calling into hill climbing again. This condition should + // be the opposite of the condition in WorkerThread.ShouldStopProcessingWorkNow that causes + // threads processing work to stop in response to a decreased thread count goal. The logic here is a bit + // different from the original CoreCLR code from which this implementation was ported because in this + // implementation there are no retired threads, so only the count of threads processing work is considered. + if (_separated.counts.NumProcessingWork > _separated.numThreadsGoal) { - // Avoid trying to adjust the thread count goal if there are already more threads than the thread count goal. - // In that situation, hill climbing must have previously decided to decrease the thread count goal, so let's - // wait until the system responds to that change before calling into hill climbing again. This condition should - // be the opposite of the condition in WorkerThread.ShouldStopProcessingWorkNow that causes - // threads processing work to stop in response to a decreased thread count goal. The logic here is a bit - // different from the original CoreCLR code from which this implementation was ported because in this - // implementation there are no retired threads, so only the count of threads processing work is considered. - ThreadCounts counts = _separated.counts.VolatileRead(); - return counts.NumProcessingWork <= counts.NumThreadsGoal && !HillClimbing.IsDisabled; + return false; } - return false; + + // Skip hill climbing when there is a pending blocking adjustment. Hill climbing may otherwise bypass the + // blocking adjustment heuristics and increase the thread count too quickly. + return _pendingBlockingAdjustment == PendingBlockingAdjustment.None; } internal void RequestWorker() @@ -357,5 +376,16 @@ internal void RequestWorker() WorkerThread.MaybeAddWorkingWorker(this); GateThread.EnsureRunning(this); } + + private bool OnGen2GCCallback() + { + // Gen 2 GCs may be very infrequent in some cases. If it becomes an issue, consider updating the memory usage more + // frequently. The memory usage is only used for fallback purposes in blocking adjustment, so an artifically higher + // memory usage may cause blocking adjustment to fall back to slower adjustments sooner than necessary. + GCMemoryInfo gcMemoryInfo = GC.GetGCMemoryInfo(); + _memoryLimitBytes = gcMemoryInfo.HighMemoryLoadThresholdBytes; + _memoryUsageBytes = Math.Min(gcMemoryInfo.MemoryLoadBytes, gcMemoryInfo.HighMemoryLoadThresholdBytes); + return true; // continue receiving gen 2 GC callbacks + } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs index 53f0aebf2db204..a638b65e1d083b 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs @@ -2973,14 +2973,30 @@ private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken can #pragma warning disable CA1416 // Validate platform compatibility, issue: https://github.com/dotnet/runtime/issues/44622 if (infiniteWait) { - returnValue = mres.Wait(Timeout.Infinite, cancellationToken); + ThreadPool.NotifyThreadBlocked(); + try + { + returnValue = mres.Wait(Timeout.Infinite, cancellationToken); + } + finally + { + ThreadPool.NotifyThreadUnblocked(); + } } else { uint elapsedTimeTicks = ((uint)Environment.TickCount) - startTimeTicks; if (elapsedTimeTicks < millisecondsTimeout) { - returnValue = mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken); + ThreadPool.NotifyThreadBlocked(); + try + { + returnValue = mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken); + } + finally + { + ThreadPool.NotifyThreadUnblocked(); + } } } #pragma warning restore CA1416 diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs index 08067113171c0e..a079b81713456c 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs @@ -100,6 +100,9 @@ public static void GetAvailableThreads(out int workerThreads, out int completion internal static bool NotifyWorkItemComplete(object? threadLocalCompletionCountObject, int currentTimeMs) => PortableThreadPool.ThreadPoolInstance.NotifyWorkItemComplete(threadLocalCompletionCountObject, currentTimeMs); + internal static void NotifyThreadBlocked() => PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked(); + internal static void NotifyThreadUnblocked() => PortableThreadPool.ThreadPoolInstance.NotifyThreadUnblocked(); + internal static object GetOrCreateThreadLocalCompletionCountObject() => PortableThreadPool.ThreadPoolInstance.GetOrCreateThreadLocalCompletionCountObject(); diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index f08e4ba4d7fe0a..be02240aff84f2 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -725,7 +725,12 @@ internal static bool Dispatch() // int currentTickCount = Environment.TickCount; if (!ThreadPool.NotifyWorkItemComplete(threadLocalCompletionCountObject, currentTickCount)) + { + // This thread is being parked and may remain inactive for a while. Transfer any thread-local work items + // to ensure that they would not be heavily delayed. + tl.TransferLocalWork(); return false; + } // Check if the dispatch quantum has expired if ((uint)(currentTickCount - startTickCount) < DispatchQuantumMs) @@ -824,6 +829,16 @@ public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq) threadLocalCompletionCountObject = ThreadPool.GetOrCreateThreadLocalCompletionCountObject(); } + public void TransferLocalWork() + { + object? cb; + while ((cb = workStealingQueue.LocalPop()) != null) + { + Debug.Assert(null != cb); + workQueue.Enqueue(cb, forceGlobal: true); + } + } + ~ThreadPoolWorkQueueThreadLocals() { // Transfer any pending workitems into the global queue so that they will be executed by another thread @@ -831,12 +846,7 @@ public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq) { if (null != workQueue) { - object? cb; - while ((cb = workStealingQueue.LocalPop()) != null) - { - Debug.Assert(null != cb); - workQueue.Enqueue(cb, forceGlobal: true); - } + TransferLocalWork(); } ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue); From 726dc9ee30fd661355ff461cb42b08dde01fabae Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Sat, 29 May 2021 19:14:01 -0700 Subject: [PATCH 02/12] Fix browser build --- .../src/System/Threading/ThreadPool.Browser.Mono.cs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Mono.cs b/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Mono.cs index df6112fa3696c3..2ca288fc8f5187 100644 --- a/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Mono.cs +++ b/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Mono.cs @@ -90,6 +90,14 @@ internal static bool NotifyWorkItemComplete(object? threadLocalCompletionCountOb return true; } + internal static void NotifyThreadBlocked() + { + } + + internal static void NotifyThreadUnblocked() + { + } + internal static object? GetOrCreateThreadLocalCompletionCountObject() => null; private static RegisteredWaitHandle RegisterWaitForSingleObject( From 96cc50ea49f1ee220b41180cb838052184afa67d Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Sun, 30 May 2021 00:45:53 -0700 Subject: [PATCH 03/12] Remove an invalid assertion --- .../src/System/Threading/PortableThreadPool.cs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs index d79c0684557da8..5204c98df6bc4e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs @@ -142,16 +142,12 @@ public bool SetMinThreads(int workerThreads, int ioCompletionThreads) wakeGateThread = true; } } - else + else if (_separated.numThreadsGoal < newMinThreads) { - Debug.Assert(newMinThreads == MinThreadsGoal); - if (_separated.numThreadsGoal < newMinThreads) + _separated.numThreadsGoal = newMinThreads; + if (_separated.numRequestedWorkers > 0) { - _separated.numThreadsGoal = newMinThreads; - if (_separated.numRequestedWorkers > 0) - { - addWorker = true; - } + addWorker = true; } } } From f3e8a31958e42b230f6751d7c246e59d7f3e9d4c Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Sun, 30 May 2021 02:40:54 -0700 Subject: [PATCH 04/12] Fix bad merge --- .../src/System/Threading/PortableThreadPool.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs index 5204c98df6bc4e..8c6163fa2f7d2d 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs @@ -340,8 +340,8 @@ private bool ShouldAdjustMaxWorkersActive(int currentTimeMs) // this also prevents the initial elapsed interval from being negative due to the prior and next times being // initialized to zero. int priorTime = Volatile.Read(ref _separated.priorCompletedWorkRequestsTime); - int requiredInterval = _separated.nextCompletedWorkRequestsTime - priorTime; - int elapsedInterval = currentTimeMs - priorTime; + uint requiredInterval = (uint)(_separated.nextCompletedWorkRequestsTime - priorTime); + uint elapsedInterval = (uint)(currentTimeMs - priorTime); if (elapsedInterval < requiredInterval) { return false; From d3a977b60df811bc18c81d7225a14af9956fe390 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Sun, 30 May 2021 11:25:16 -0700 Subject: [PATCH 05/12] Update event enum with new thread adjustment reason --- .../Threading/NativeRuntimeEventSource.PortableThreadPool.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/NativeRuntimeEventSource.PortableThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/NativeRuntimeEventSource.PortableThreadPool.cs index 2dcce9f59a0731..785000089bdaaf 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/NativeRuntimeEventSource.PortableThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/NativeRuntimeEventSource.PortableThreadPool.cs @@ -66,7 +66,8 @@ public enum ThreadAdjustmentReasonMap : uint ChangePoint, Stabilizing, Starvation, - ThreadTimedOut + ThreadTimedOut, + CooperativeBlocking, } #if !ES_BUILD_STANDALONE From c2cb717c4227274b51140c1c715b1316fc934d8b Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Sun, 30 May 2021 12:10:08 -0700 Subject: [PATCH 06/12] Log all of the transitions for now (will change later when needed), and fix throughput numbers sent in events --- .../Threading/PortableThreadPool.Blocking.cs | 6 ++---- .../PortableThreadPool.HillClimbing.cs | 17 +++++++---------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs index 7065353338e796..151a93d3972c36 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs @@ -168,8 +168,7 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo _separated.numThreadsGoal = numThreadsGoal -= toSubtract; HillClimbing.ThreadPoolHillClimber.ForceChange( numThreadsGoal, - HillClimbing.StateOrTransition.CooperativeBlocking, - logTransition: false); // reduce noise + HillClimbing.StateOrTransition.CooperativeBlocking); return 0; } @@ -254,8 +253,7 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo _separated.numThreadsGoal = newNumThreadsGoal; HillClimbing.ThreadPoolHillClimber.ForceChange( newNumThreadsGoal, - HillClimbing.StateOrTransition.CooperativeBlocking, - logTransition: newNumThreadsGoal == maxThreadsGoalWithoutDelay || previousDelayElapsed); // reduce noise + HillClimbing.StateOrTransition.CooperativeBlocking); if (counts.NumProcessingWork >= numThreadsGoal && _separated.numRequestedWorkers > 0) { addWorker = true; diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.HillClimbing.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.HillClimbing.cs index 88190c6e480aa9..3a2b10096887d0 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.HillClimbing.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.HillClimbing.cs @@ -358,7 +358,11 @@ public HillClimbing() // If all of this caused an actual change in thread count, log that as well. // if (newThreadCount != currentThreadCount) + { ChangeThreadCount(newThreadCount, state); + _secondsElapsedSinceLastChange = 0; + _completionsSinceLastChange = 0; + } // // Return the new thread count and sample interval. This is randomized to prevent correlations with other periodic @@ -377,7 +381,7 @@ public HillClimbing() return (newThreadCount, newSampleInterval); } - private void ChangeThreadCount(int newThreadCount, StateOrTransition state, bool logTransition = true) + private void ChangeThreadCount(int newThreadCount, StateOrTransition state) { _lastThreadCount = newThreadCount; @@ -386,15 +390,8 @@ private void ChangeThreadCount(int newThreadCount, StateOrTransition state, bool _currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1); } - if (!logTransition) - { - return; - } - double throughput = _secondsElapsedSinceLastChange > 0 ? _completionsSinceLastChange / _secondsElapsedSinceLastChange : 0; LogTransition(newThreadCount, throughput, state); - _secondsElapsedSinceLastChange = 0; - _completionsSinceLastChange = 0; } private void LogTransition(int newThreadCount, double throughput, StateOrTransition stateOrTransition) @@ -427,12 +424,12 @@ private void LogTransition(int newThreadCount, double throughput, StateOrTransit } } - public void ForceChange(int newThreadCount, StateOrTransition state, bool logTransition = true) + public void ForceChange(int newThreadCount, StateOrTransition state) { if (_lastThreadCount != newThreadCount) { _currentControlSetting += newThreadCount - _lastThreadCount; - ChangeThreadCount(newThreadCount, state, logTransition); + ChangeThreadCount(newThreadCount, state); } } From 20526d62afa9b19ef54534fead214ebbf2dd1ec6 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Sun, 30 May 2021 13:24:46 -0700 Subject: [PATCH 07/12] Add a test for some coverage --- .../tests/ThreadPoolTests.cs | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs index fbd515aecb7f47..5a912a1c5c3cef 100644 --- a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs +++ b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs @@ -885,6 +885,71 @@ public static void ThreadPoolThreadCreationDoesNotTransferExecutionContext() }).Dispose(); } + [ConditionalFact(nameof(IsThreadingAndRemoteExecutorSupported))] + public static void CooperativeBlockingCanCreateThreadsFaster() + { + // Run in a separate process to test in a clean thread pool environment such that work items queued by the test + // would cause the thread pool to create threads + RemoteExecutor.Invoke(() => + { + // All but the last of these work items will block and the last queued work item would release the blocking. Without + // cooperative blocking, this would lead to starvation after work items run. Since starvation adds threads + // at a rate of at most 2 per second, the extra 120 work items would take roughly 60 seconds to get unblocked and since + // the test waits for 30 seconds it would time out. Cooperative blocking is configured below to ensure that starvation + // would not be hit for this many work items, and to increase the rate of thread injection for testing purposes while + // getting a decent amount of coverage for its behavior. With cooperative blocking as configured below, the test should + // finish within a few seconds. + int processorCount = Environment.ProcessorCount; + int workItemCount = processorCount + 120; + int workItemCount_procCountFactor = workItemCount / processorCount; + workItemCount = workItemCount_procCountFactor * processorCount; + SetBlockingConfigValue("ThreadsToAddWithoutDelay_ProcCountFactor", 1); + SetBlockingConfigValue("MaxThreadsToAddBeforeFallback_ProcCountFactor", workItemCount_procCountFactor); + SetBlockingConfigValue("MaxDelayUntilFallbackMs", 1); + + var allWorkItemsUnblocked = new AutoResetEvent(false); + + // Run a second iteration for some extra coverage. Iterations after the first one would be much faster because the + // necessary number of threads would already have been created by then, and would not add much to the test time. + for (int iterationIndex = 0; iterationIndex < 2; ++iterationIndex) + { + var tcs = new TaskCompletionSource(); + int unblockedThreadCount = 0; + + Action blockingWorkItem = _ => + { + tcs.Task.Wait(); + if (Interlocked.Increment(ref unblockedThreadCount) == workItemCount - 1) + { + allWorkItemsUnblocked.Set(); + } + }; + + for (int i = 0; i < workItemCount - 1; ++i) + { + ThreadPool.UnsafeQueueUserWorkItem(blockingWorkItem, 0, preferLocal: false); + } + + Action unblockingWorkItem = _ => tcs.SetResult(0); + ThreadPool.UnsafeQueueUserWorkItem(unblockingWorkItem, 0, preferLocal: false); + Assert.True(allWorkItemsUnblocked.WaitOne(30_000)); + } + + void SetBlockingConfigValue(string name, int value) => + AppContextSetData("System.Threading.ThreadPool.Blocking." + name, value); + + void AppContextSetData(string name, object value) + { + typeof(AppContext).InvokeMember( + "SetData", + BindingFlags.ExactBinding | BindingFlags.InvokeMethod | BindingFlags.Public | BindingFlags.Static, + null, + null, + new object[] { name, value }); + } + }).Dispose(); + } + public static bool IsThreadingAndRemoteExecutorSupported => PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported; } From 984888fad5e831ee7238411f494ce99699de13a5 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Mon, 31 May 2021 01:06:12 -0700 Subject: [PATCH 08/12] Fix test --- .../System.Threading.ThreadPool/tests/ThreadPoolTests.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs index 5a912a1c5c3cef..3ec4640acf8558 100644 --- a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs +++ b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Linq; +using System.Reflection; using System.Threading.Tasks; using System.Threading.Tests; using Microsoft.DotNet.RemoteExecutor; From 56bd4d94097f5653b445afd3a11086dd10369281 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Wed, 2 Jun 2021 10:25:07 -0700 Subject: [PATCH 09/12] Address feedback, add an assertion --- .../System/Threading/ThreadPool.CoreCLR.cs | 15 ++----- .../Threading/PortableThreadPool.Blocking.cs | 40 ++++++------------- .../PortableThreadPool.ThreadCounts.cs | 4 +- .../src/System/Threading/Tasks/Task.cs | 14 +++++-- .../System/Threading/ThreadPool.Portable.cs | 2 +- .../System/Threading/ThreadPoolWorkQueue.cs | 10 +---- 6 files changed, 33 insertions(+), 52 deletions(-) diff --git a/src/coreclr/System.Private.CoreLib/src/System/Threading/ThreadPool.CoreCLR.cs b/src/coreclr/System.Private.CoreLib/src/System/Threading/ThreadPool.CoreCLR.cs index dbf50bd81d080f..86dd19a5a75609 100644 --- a/src/coreclr/System.Private.CoreLib/src/System/Threading/ThreadPool.CoreCLR.cs +++ b/src/coreclr/System.Private.CoreLib/src/System/Threading/ThreadPool.CoreCLR.cs @@ -522,20 +522,13 @@ internal static void NotifyWorkItemProgress() [MethodImpl(MethodImplOptions.InternalCall)] private static extern void NotifyWorkItemProgressNative(); - internal static void NotifyThreadBlocked() - { - if (UsePortableThreadPool) - { - PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked(); - } - } + internal static bool NotifyThreadBlocked() => + UsePortableThreadPool && PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked(); internal static void NotifyThreadUnblocked() { - if (UsePortableThreadPool) - { - PortableThreadPool.ThreadPoolInstance.NotifyThreadUnblocked(); - } + Debug.Assert(UsePortableThreadPool); + PortableThreadPool.ThreadPoolInstance.NotifyThreadUnblocked(); } internal static object? GetOrCreateThreadLocalCompletionCountObject() => diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs index 151a93d3972c36..0ad38fd04d0151 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs @@ -22,19 +22,10 @@ private short TargetThreadsGoalForBlockingAdjustment { _threadAdjustmentLock.VerifyIsLocked(); - short targetThreadsGoal = _minThreads; - if (_numBlockedThreads <= 0) - { - return targetThreadsGoal; - } - - short maxThreads = MaxThreadsForBlockingAdjustment; - targetThreadsGoal += _numBlockedThreads; - if (targetThreadsGoal < _numBlockedThreads || targetThreadsGoal > maxThreads) - { - targetThreadsGoal = maxThreads; - } - return targetThreadsGoal; + return + _numBlockedThreads <= 0 + ? _minThreads + : (short)Math.Min((ushort)(_minThreads + _numBlockedThreads), (ushort)MaxThreadsForBlockingAdjustment); } } @@ -43,17 +34,16 @@ private short MaxThreadsForBlockingAdjustment get { _threadAdjustmentLock.VerifyIsLocked(); - - short result = (short)(_minThreads + BlockingConfig.MaxThreadsToAddBeforeFallback); - return result < BlockingConfig.MaxThreadsToAddBeforeFallback || result > _maxThreads ? _maxThreads : result; + return + (short)Math.Min((ushort)(_minThreads + BlockingConfig.MaxThreadsToAddBeforeFallback), (ushort)_maxThreads); } } - public void NotifyThreadBlocked() + public bool NotifyThreadBlocked() { if (!BlockingConfig.IsCooperativeBlockingEnabled || !Thread.CurrentThread.IsThreadPoolThread) { - return; + return false; } bool wakeGateThread = false; @@ -82,14 +72,13 @@ public void NotifyThreadBlocked() { GateThread.Wake(this); } + return true; } public void NotifyThreadUnblocked() { - if (!BlockingConfig.IsCooperativeBlockingEnabled || !Thread.CurrentThread.IsThreadPoolThread) - { - return; - } + Debug.Assert(BlockingConfig.IsCooperativeBlockingEnabled); + Debug.Assert(Thread.CurrentThread.IsThreadPoolThread); bool wakeGateThread = false; _threadAdjustmentLock.Acquire(); @@ -173,11 +162,8 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo } short maxThreads = MaxThreadsForBlockingAdjustment; - short configuredMaxThreadsWithoutDelay = (short)(_minThreads + BlockingConfig.ThreadsToAddWithoutDelay); - if (configuredMaxThreadsWithoutDelay < BlockingConfig.ThreadsToAddWithoutDelay) - { - configuredMaxThreadsWithoutDelay = maxThreads; - } + short configuredMaxThreadsWithoutDelay = + (short)Math.Min((ushort)(_minThreads + BlockingConfig.ThreadsToAddWithoutDelay), (ushort)maxThreads); do { diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs index ae2efb6c7761d4..d4673f5cd73296 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs @@ -49,7 +49,9 @@ public void SubtractNumProcessingWork(short value) public void InterlockedDecrementNumProcessingWork() { Debug.Assert(NumProcessingWorkShift == 0); - Interlocked.Decrement(ref _data); + + ThreadCounts counts = new ThreadCounts(Interlocked.Decrement(ref _data)); + Debug.Assert(counts.NumProcessingWork >= 0); } /// diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs index a638b65e1d083b..923c4cf71bcf4d 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs @@ -2973,14 +2973,17 @@ private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken can #pragma warning disable CA1416 // Validate platform compatibility, issue: https://github.com/dotnet/runtime/issues/44622 if (infiniteWait) { - ThreadPool.NotifyThreadBlocked(); + bool notifyWhenUnblocked = ThreadPool.NotifyThreadBlocked(); try { returnValue = mres.Wait(Timeout.Infinite, cancellationToken); } finally { - ThreadPool.NotifyThreadUnblocked(); + if (notifyWhenUnblocked) + { + ThreadPool.NotifyThreadUnblocked(); + } } } else @@ -2988,14 +2991,17 @@ private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken can uint elapsedTimeTicks = ((uint)Environment.TickCount) - startTimeTicks; if (elapsedTimeTicks < millisecondsTimeout) { - ThreadPool.NotifyThreadBlocked(); + bool notifyWhenUnblocked = ThreadPool.NotifyThreadBlocked(); try { returnValue = mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken); } finally { - ThreadPool.NotifyThreadUnblocked(); + if (notifyWhenUnblocked) + { + ThreadPool.NotifyThreadUnblocked(); + } } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs index a079b81713456c..70ac108e32d748 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs @@ -100,7 +100,7 @@ public static void GetAvailableThreads(out int workerThreads, out int completion internal static bool NotifyWorkItemComplete(object? threadLocalCompletionCountObject, int currentTimeMs) => PortableThreadPool.ThreadPoolInstance.NotifyWorkItemComplete(threadLocalCompletionCountObject, currentTimeMs); - internal static void NotifyThreadBlocked() => PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked(); + internal static bool NotifyThreadBlocked() => PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked(); internal static void NotifyThreadUnblocked() => PortableThreadPool.ThreadPoolInstance.NotifyThreadUnblocked(); internal static object GetOrCreateThreadLocalCompletionCountObject() => diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index be02240aff84f2..e0dfdc951cf87f 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -831,10 +831,8 @@ public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq) public void TransferLocalWork() { - object? cb; - while ((cb = workStealingQueue.LocalPop()) != null) + while (workStealingQueue.LocalPop() is object cb) { - Debug.Assert(null != cb); workQueue.Enqueue(cb, forceGlobal: true); } } @@ -844,11 +842,7 @@ public void TransferLocalWork() // Transfer any pending workitems into the global queue so that they will be executed by another thread if (null != workStealingQueue) { - if (null != workQueue) - { - TransferLocalWork(); - } - + TransferLocalWork(); ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue); } } From 8fc1f8a3ad61c47a26543023a45c9022dd72ba15 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Wed, 2 Jun 2021 11:02:08 -0700 Subject: [PATCH 10/12] Fix browser build --- .../src/System/Threading/ThreadPool.Browser.Mono.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Mono.cs b/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Mono.cs index 2ca288fc8f5187..9cfc8c91999b52 100644 --- a/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Mono.cs +++ b/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Mono.cs @@ -90,9 +90,7 @@ internal static bool NotifyWorkItemComplete(object? threadLocalCompletionCountOb return true; } - internal static void NotifyThreadBlocked() - { - } + internal static bool NotifyThreadBlocked() => false; internal static void NotifyThreadUnblocked() { From 1fe206765366807b01160036f83b42bb7731d62f Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Fri, 4 Jun 2021 12:01:16 -0700 Subject: [PATCH 11/12] Remove max threads config for fallback, rename max delay config --- .../Threading/PortableThreadPool.Blocking.cs | 59 +++++-------------- .../tests/ThreadPoolTests.cs | 24 ++++---- 2 files changed, 25 insertions(+), 58 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs index 0ad38fd04d0151..ac47e5251eb5df 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs @@ -25,17 +25,7 @@ private short TargetThreadsGoalForBlockingAdjustment return _numBlockedThreads <= 0 ? _minThreads - : (short)Math.Min((ushort)(_minThreads + _numBlockedThreads), (ushort)MaxThreadsForBlockingAdjustment); - } - } - - private short MaxThreadsForBlockingAdjustment - { - get - { - _threadAdjustmentLock.VerifyIsLocked(); - return - (short)Math.Min((ushort)(_minThreads + BlockingConfig.MaxThreadsToAddBeforeFallback), (ushort)_maxThreads); + : (short)Math.Min((ushort)(_minThreads + _numBlockedThreads), (ushort)_maxThreads); } } @@ -161,9 +151,8 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo return 0; } - short maxThreads = MaxThreadsForBlockingAdjustment; short configuredMaxThreadsWithoutDelay = - (short)Math.Min((ushort)(_minThreads + BlockingConfig.ThreadsToAddWithoutDelay), (ushort)maxThreads); + (short)Math.Min((ushort)(_minThreads + BlockingConfig.ThreadsToAddWithoutDelay), (ushort)_maxThreads); do { @@ -171,7 +160,7 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo // waiting for work can be released for work without a delay, but creating a new thread may need a delay. ThreadCounts counts = _separated.counts; short maxThreadsGoalWithoutDelay = - Math.Max(configuredMaxThreadsWithoutDelay, Math.Min(counts.NumExistingThreads, maxThreads)); + Math.Max(configuredMaxThreadsWithoutDelay, Math.Min(counts.NumExistingThreads, _maxThreads)); short targetThreadsGoalWithoutDelay = Math.Min(targetThreadsGoal, maxThreadsGoalWithoutDelay); short newNumThreadsGoal; if (numThreadsGoal < targetThreadsGoalWithoutDelay) @@ -214,7 +203,7 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo counts.NumExistingThreads * (long)WorkerThread.EstimatedAdditionalStackUsagePerThreadBytes; // The memory limit may already be less than the total amount of physical memory. We are only accounting for - // thread pool worker threads above, and after fallback starvation will have to continue creating threads + // thread pool worker threads above, and after fallback starvation may have to continue creating threads // slowly to prevent a deadlock, so calculate a threshold before falling back by giving the memory limit // some additional buffer. long memoryThresholdForFallbackBytes = memoryLimitBytes * 8 / 10; @@ -256,7 +245,7 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo // limits on max thread count and max delays. _pendingBlockingAdjustment = PendingBlockingAdjustment.WithDelayIfNecessary; int delayStepCount = 1 + (numThreadsGoal - configuredMaxThreadsWithoutDelay) / BlockingConfig.ThreadsPerDelayStep; - return Math.Min((uint)delayStepCount * BlockingConfig.DelayStepMs, BlockingConfig.MaxDelayUntilFallbackMs); + return Math.Min((uint)delayStepCount * BlockingConfig.DelayStepMs, BlockingConfig.MaxDelayMs); } private enum PendingBlockingAdjustment : byte @@ -272,10 +261,9 @@ private static class BlockingConfig AppContextConfigHelper.GetBooleanConfig("System.Threading.ThreadPool.Blocking.CooperativeBlocking", true); public static readonly short ThreadsToAddWithoutDelay; - public static readonly short MaxThreadsToAddBeforeFallback; public static readonly short ThreadsPerDelayStep; public static readonly uint DelayStepMs; - public static readonly uint MaxDelayUntilFallbackMs; + public static readonly uint MaxDelayMs; #pragma warning disable CA1810 // remove the explicit static constructor static BlockingConfig() @@ -286,9 +274,7 @@ static BlockingConfig() // - After that, before each additional thread is created, a delay is induced, starting with DelayStepMs // - For every ThreadsPerDelayStep threads that are added with a delay, an additional DelayStepMs is added to // the delay - // - Until MaxThreadsToAddBeforeFallback threads are added, the delay may not exceed MaxDelayUntilFallbackMs - // - After MaxThreadsToAddBeforeFallback threads are added, the system operates in fallback mode where a thread - // would be created if starvation is detected and typically with longer delays + // - The delay may not exceed MaxDelayUntilFallbackMs // - Delays are only induced before creating threads. If threads are already available, they would be released // without delay to compensate for cooperative blocking. // - Physical memory usage and limits are also used and beyond a threshold, the system switches to fallback mode @@ -301,15 +287,6 @@ static BlockingConfig() 1, false); - // After the thread count based on MinThreads is reached, this value (after it is multiplied by the processor - // count) specifies how many additional threads may be created, perhaps with delays, before fallback. In - // fallback mode a thread would be created if starvation is detected and typically with longer delays. - int blocking_maxThreadsToAddBeforeFallback_procCountFactor = - AppContextConfigHelper.GetInt32Config( - "System.Threading.ThreadPool.Blocking.MaxThreadsToAddBeforeFallback_ProcCountFactor", - 10, - false); - // After the thread count based on ThreadsToAddWithoutDelay is reached, this value (after it is multiplied by // the processor count) specifies after how many threads an additional DelayStepMs would be added to the delay // before each new thread is created @@ -328,31 +305,23 @@ static BlockingConfig() false); // After the thread count based on ThreadsToAddWithoutDelay is reached, this value specifies the max delay to - // use before each new thread is created, until fallback. Fallback mode is used after the thread count based on - // MaxThreadsToAddBeforeFallback is reached, see that config value for more information. - MaxDelayUntilFallbackMs = + // use before each new thread is created + MaxDelayMs = (uint)AppContextConfigHelper.GetInt32Config( "System.Threading.ThreadPool.Blocking.MaxDelayUntilFallbackMs", 250, false); int processorCount = Environment.ProcessorCount; - MaxThreadsToAddBeforeFallback = (short)(processorCount * blocking_maxThreadsToAddBeforeFallback_procCountFactor); - if (MaxThreadsToAddBeforeFallback > MaxPossibleThreadCount || - MaxThreadsToAddBeforeFallback / processorCount != blocking_maxThreadsToAddBeforeFallback_procCountFactor) - { - MaxThreadsToAddBeforeFallback = MaxPossibleThreadCount; - } - ThreadsToAddWithoutDelay = (short)(processorCount * blocking_threadsToAddWithoutDelay_procCountFactor); - if (ThreadsToAddWithoutDelay > MaxThreadsToAddBeforeFallback || + if (ThreadsToAddWithoutDelay > MaxPossibleThreadCount || ThreadsToAddWithoutDelay / processorCount != blocking_threadsToAddWithoutDelay_procCountFactor) { - ThreadsToAddWithoutDelay = MaxThreadsToAddBeforeFallback; + ThreadsToAddWithoutDelay = MaxPossibleThreadCount; } blocking_threadsPerDelayStep_procCountFactor = Math.Max(1, blocking_threadsPerDelayStep_procCountFactor); - short maxThreadsPerDelayStep = (short)(MaxThreadsToAddBeforeFallback - ThreadsToAddWithoutDelay); + short maxThreadsPerDelayStep = (short)(MaxPossibleThreadCount - ThreadsToAddWithoutDelay); ThreadsPerDelayStep = (short)(processorCount * blocking_threadsPerDelayStep_procCountFactor); if (ThreadsPerDelayStep > maxThreadsPerDelayStep || @@ -361,8 +330,8 @@ static BlockingConfig() ThreadsPerDelayStep = maxThreadsPerDelayStep; } - MaxDelayUntilFallbackMs = Math.Max(1, Math.Min(MaxDelayUntilFallbackMs, GateThread.GateActivitiesPeriodMs)); - DelayStepMs = Math.Max(1, Math.Min(DelayStepMs, MaxDelayUntilFallbackMs)); + MaxDelayMs = Math.Max(1, Math.Min(MaxDelayMs, GateThread.GateActivitiesPeriodMs)); + DelayStepMs = Math.Max(1, Math.Min(DelayStepMs, MaxDelayMs)); } #pragma warning restore CA1810 } diff --git a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs index 3ec4640acf8558..767e5bef92c165 100644 --- a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs +++ b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs @@ -893,25 +893,23 @@ public static void CooperativeBlockingCanCreateThreadsFaster() // would cause the thread pool to create threads RemoteExecutor.Invoke(() => { - // All but the last of these work items will block and the last queued work item would release the blocking. Without - // cooperative blocking, this would lead to starvation after work items run. Since starvation adds threads - // at a rate of at most 2 per second, the extra 120 work items would take roughly 60 seconds to get unblocked and since - // the test waits for 30 seconds it would time out. Cooperative blocking is configured below to ensure that starvation - // would not be hit for this many work items, and to increase the rate of thread injection for testing purposes while - // getting a decent amount of coverage for its behavior. With cooperative blocking as configured below, the test should - // finish within a few seconds. + // All but the last of these work items will block and the last queued work item would release the blocking. + // Without cooperative blocking, this would lead to starvation after work items run. Since + // starvation adds threads at a rate of at most 2 per second, the extra 120 work items would take roughly 60 + // seconds to get unblocked and since the test waits for 30 seconds it would time out. Cooperative blocking is + // configured below to increase the rate of thread injection for testing purposes while getting a decent amount + // of coverage for its behavior. With cooperative blocking as configured below, the test should finish within a + // few seconds. int processorCount = Environment.ProcessorCount; int workItemCount = processorCount + 120; - int workItemCount_procCountFactor = workItemCount / processorCount; - workItemCount = workItemCount_procCountFactor * processorCount; SetBlockingConfigValue("ThreadsToAddWithoutDelay_ProcCountFactor", 1); - SetBlockingConfigValue("MaxThreadsToAddBeforeFallback_ProcCountFactor", workItemCount_procCountFactor); - SetBlockingConfigValue("MaxDelayUntilFallbackMs", 1); + SetBlockingConfigValue("MaxDelayMs", 1); var allWorkItemsUnblocked = new AutoResetEvent(false); - // Run a second iteration for some extra coverage. Iterations after the first one would be much faster because the - // necessary number of threads would already have been created by then, and would not add much to the test time. + // Run a second iteration for some extra coverage. Iterations after the first one would be much faster because + // the necessary number of threads would already have been created by then, and would not add much to the test + // time. for (int iterationIndex = 0; iterationIndex < 2; ++iterationIndex) { var tcs = new TaskCompletionSource(); From e7b950846303bc475bcf2265d331a903e8ccbba7 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Fri, 4 Jun 2021 12:15:12 -0700 Subject: [PATCH 12/12] Actually rename config --- .../src/System/Threading/PortableThreadPool.Blocking.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs index ac47e5251eb5df..1ac8ecb60c91d1 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs @@ -274,10 +274,11 @@ static BlockingConfig() // - After that, before each additional thread is created, a delay is induced, starting with DelayStepMs // - For every ThreadsPerDelayStep threads that are added with a delay, an additional DelayStepMs is added to // the delay - // - The delay may not exceed MaxDelayUntilFallbackMs + // - The delay may not exceed MaxDelayMs // - Delays are only induced before creating threads. If threads are already available, they would be released // without delay to compensate for cooperative blocking. // - Physical memory usage and limits are also used and beyond a threshold, the system switches to fallback mode + // where threads would be created if starvation is detected, typically with higher delays // After the thread count based on MinThreads is reached, this value (after it is multiplied by the processor // count) specifies how many additional threads may be created without a delay @@ -308,7 +309,7 @@ static BlockingConfig() // use before each new thread is created MaxDelayMs = (uint)AppContextConfigHelper.GetInt32Config( - "System.Threading.ThreadPool.Blocking.MaxDelayUntilFallbackMs", + "System.Threading.ThreadPool.Blocking.MaxDelayMs", 250, false);