Skip to content

Commit a7a2fd6

Browse files
authored
Improve the rate of thread injection for blocking due to sync-over-async (#53471)
* Improve the rate of thread injection for blocking due to sync-over-async Fixes #52558
1 parent 27baae9 commit a7a2fd6

File tree

14 files changed

+821
-206
lines changed

14 files changed

+821
-206
lines changed

src/coreclr/System.Private.CoreLib/src/System/Threading/ThreadPool.CoreCLR.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,15 @@ internal static void NotifyWorkItemProgress()
522522
[MethodImpl(MethodImplOptions.InternalCall)]
523523
private static extern void NotifyWorkItemProgressNative();
524524

525+
internal static bool NotifyThreadBlocked() =>
526+
UsePortableThreadPool && PortableThreadPool.ThreadPoolInstance.NotifyThreadBlocked();
527+
528+
internal static void NotifyThreadUnblocked()
529+
{
530+
Debug.Assert(UsePortableThreadPool);
531+
PortableThreadPool.ThreadPoolInstance.NotifyThreadUnblocked();
532+
}
533+
525534
internal static object? GetOrCreateThreadLocalCompletionCountObject() =>
526535
UsePortableThreadPool ? PortableThreadPool.ThreadPoolInstance.GetOrCreateThreadLocalCompletionCountObject() : null;
527536

src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2167,9 +2167,10 @@
21672167
<ItemGroup Condition="'$(FeaturePortableThreadPool)' == 'true'">
21682168
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadPool.Portable.cs" Condition="'$(FeatureCoreCLR)' != 'true'" />
21692169
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadPoolBoundHandle.PlatformNotSupported.cs" Condition="'$(FeatureCoreCLR)' != 'true'" />
2170-
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.cs" />
21712170
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\NativeRuntimeEventSource.PortableThreadPool.cs" Condition="'$(FeatureCoreCLR)' != 'true' and '$(FeatureMono)' != 'true'" />
21722171
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\NativeRuntimeEventSource.PortableThreadPool.NativeSinks.cs" Condition="'$(FeatureCoreCLR)' == 'true' or '$(FeatureMono)' == 'true'"/>
2172+
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.cs" />
2173+
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Blocking.cs" />
21732174
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.GateThread.cs" />
21742175
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.HillClimbing.cs" />
21752176
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.HillClimbing.Complex.cs" />

src/libraries/System.Private.CoreLib/src/System/Threading/NativeRuntimeEventSource.PortableThreadPool.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ public enum ThreadAdjustmentReasonMap : uint
6666
ChangePoint,
6767
Stabilizing,
6868
Starvation,
69-
ThreadTimedOut
69+
ThreadTimedOut,
70+
CooperativeBlocking,
7071
}
7172

7273
#if !ES_BUILD_STANDALONE
Lines changed: 340 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,340 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Diagnostics;
5+
6+
namespace System.Threading
7+
{
8+
internal sealed partial class PortableThreadPool
9+
{
10+
public short MinThreadsGoal
11+
{
12+
get
13+
{
14+
_threadAdjustmentLock.VerifyIsLocked();
15+
return Math.Min(_separated.numThreadsGoal, TargetThreadsGoalForBlockingAdjustment);
16+
}
17+
}
18+
19+
private short TargetThreadsGoalForBlockingAdjustment
20+
{
21+
get
22+
{
23+
_threadAdjustmentLock.VerifyIsLocked();
24+
25+
return
26+
_numBlockedThreads <= 0
27+
? _minThreads
28+
: (short)Math.Min((ushort)(_minThreads + _numBlockedThreads), (ushort)_maxThreads);
29+
}
30+
}
31+
32+
public bool NotifyThreadBlocked()
33+
{
34+
if (!BlockingConfig.IsCooperativeBlockingEnabled || !Thread.CurrentThread.IsThreadPoolThread)
35+
{
36+
return false;
37+
}
38+
39+
bool wakeGateThread = false;
40+
_threadAdjustmentLock.Acquire();
41+
try
42+
{
43+
_numBlockedThreads++;
44+
Debug.Assert(_numBlockedThreads > 0);
45+
46+
if (_pendingBlockingAdjustment != PendingBlockingAdjustment.WithDelayIfNecessary &&
47+
_separated.numThreadsGoal < TargetThreadsGoalForBlockingAdjustment)
48+
{
49+
if (_pendingBlockingAdjustment == PendingBlockingAdjustment.None)
50+
{
51+
wakeGateThread = true;
52+
}
53+
_pendingBlockingAdjustment = PendingBlockingAdjustment.WithDelayIfNecessary;
54+
}
55+
}
56+
finally
57+
{
58+
_threadAdjustmentLock.Release();
59+
}
60+
61+
if (wakeGateThread)
62+
{
63+
GateThread.Wake(this);
64+
}
65+
return true;
66+
}
67+
68+
public void NotifyThreadUnblocked()
69+
{
70+
Debug.Assert(BlockingConfig.IsCooperativeBlockingEnabled);
71+
Debug.Assert(Thread.CurrentThread.IsThreadPoolThread);
72+
73+
bool wakeGateThread = false;
74+
_threadAdjustmentLock.Acquire();
75+
try
76+
{
77+
Debug.Assert(_numBlockedThreads > 0);
78+
_numBlockedThreads--;
79+
80+
if (_pendingBlockingAdjustment != PendingBlockingAdjustment.Immediately &&
81+
_numThreadsAddedDueToBlocking > 0 &&
82+
_separated.numThreadsGoal > TargetThreadsGoalForBlockingAdjustment)
83+
{
84+
wakeGateThread = true;
85+
_pendingBlockingAdjustment = PendingBlockingAdjustment.Immediately;
86+
}
87+
}
88+
finally
89+
{
90+
_threadAdjustmentLock.Release();
91+
}
92+
93+
if (wakeGateThread)
94+
{
95+
GateThread.Wake(this);
96+
}
97+
}
98+
99+
private uint PerformBlockingAdjustment(bool previousDelayElapsed)
100+
{
101+
uint nextDelayMs;
102+
bool addWorker;
103+
_threadAdjustmentLock.Acquire();
104+
try
105+
{
106+
nextDelayMs = PerformBlockingAdjustment(previousDelayElapsed, out addWorker);
107+
}
108+
finally
109+
{
110+
_threadAdjustmentLock.Release();
111+
}
112+
113+
if (addWorker)
114+
{
115+
WorkerThread.MaybeAddWorkingWorker(this);
116+
}
117+
return nextDelayMs;
118+
}
119+
120+
private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWorker)
121+
{
122+
_threadAdjustmentLock.VerifyIsLocked();
123+
Debug.Assert(_pendingBlockingAdjustment != PendingBlockingAdjustment.None);
124+
125+
_pendingBlockingAdjustment = PendingBlockingAdjustment.None;
126+
addWorker = false;
127+
128+
short targetThreadsGoal = TargetThreadsGoalForBlockingAdjustment;
129+
short numThreadsGoal = _separated.numThreadsGoal;
130+
if (numThreadsGoal == targetThreadsGoal)
131+
{
132+
return 0;
133+
}
134+
135+
if (numThreadsGoal > targetThreadsGoal)
136+
{
137+
// The goal is only decreased by how much it was increased in total due to blocking adjustments. This is to
138+
// allow blocking adjustments to play well with starvation and hill climbing, either of which may increase the
139+
// goal independently for other reasons, and blocking adjustments should not undo those changes.
140+
if (_numThreadsAddedDueToBlocking <= 0)
141+
{
142+
return 0;
143+
}
144+
145+
short toSubtract = Math.Min((short)(numThreadsGoal - targetThreadsGoal), _numThreadsAddedDueToBlocking);
146+
_numThreadsAddedDueToBlocking -= toSubtract;
147+
_separated.numThreadsGoal = numThreadsGoal -= toSubtract;
148+
HillClimbing.ThreadPoolHillClimber.ForceChange(
149+
numThreadsGoal,
150+
HillClimbing.StateOrTransition.CooperativeBlocking);
151+
return 0;
152+
}
153+
154+
short configuredMaxThreadsWithoutDelay =
155+
(short)Math.Min((ushort)(_minThreads + BlockingConfig.ThreadsToAddWithoutDelay), (ushort)_maxThreads);
156+
157+
do
158+
{
159+
// Calculate how many threads can be added without a delay. Threads that were already created but may be just
160+
// waiting for work can be released for work without a delay, but creating a new thread may need a delay.
161+
ThreadCounts counts = _separated.counts;
162+
short maxThreadsGoalWithoutDelay =
163+
Math.Max(configuredMaxThreadsWithoutDelay, Math.Min(counts.NumExistingThreads, _maxThreads));
164+
short targetThreadsGoalWithoutDelay = Math.Min(targetThreadsGoal, maxThreadsGoalWithoutDelay);
165+
short newNumThreadsGoal;
166+
if (numThreadsGoal < targetThreadsGoalWithoutDelay)
167+
{
168+
newNumThreadsGoal = targetThreadsGoalWithoutDelay;
169+
}
170+
else if (previousDelayElapsed)
171+
{
172+
newNumThreadsGoal = (short)(numThreadsGoal + 1);
173+
}
174+
else
175+
{
176+
// Need to induce a delay before adding a thread
177+
break;
178+
}
179+
180+
do
181+
{
182+
if (newNumThreadsGoal <= counts.NumExistingThreads)
183+
{
184+
break;
185+
}
186+
187+
//
188+
// Threads would likely need to be created to compensate for blocking, so check memory usage and limits
189+
//
190+
191+
long memoryLimitBytes = _memoryLimitBytes;
192+
if (memoryLimitBytes <= 0)
193+
{
194+
break;
195+
}
196+
197+
// Memory usage is updated after gen 2 GCs, and roughly represents how much physical memory was in use at
198+
// the time of the last gen 2 GC. When new threads are also blocking, they may not have used their typical
199+
// amount of stack space, and gen 2 GCs may not be happening to update the memory usage. Account for a bit
200+
// of extra stack space usage in the future for each thread.
201+
long memoryUsageBytes =
202+
_memoryUsageBytes +
203+
counts.NumExistingThreads * (long)WorkerThread.EstimatedAdditionalStackUsagePerThreadBytes;
204+
205+
// The memory limit may already be less than the total amount of physical memory. We are only accounting for
206+
// thread pool worker threads above, and after fallback starvation may have to continue creating threads
207+
// slowly to prevent a deadlock, so calculate a threshold before falling back by giving the memory limit
208+
// some additional buffer.
209+
long memoryThresholdForFallbackBytes = memoryLimitBytes * 8 / 10;
210+
if (memoryUsageBytes >= memoryThresholdForFallbackBytes)
211+
{
212+
return 0;
213+
}
214+
215+
// Determine how many threads can be added without exceeding the memory threshold
216+
long achievableNumThreadsGoal =
217+
counts.NumExistingThreads +
218+
(memoryThresholdForFallbackBytes - memoryUsageBytes) /
219+
WorkerThread.EstimatedAdditionalStackUsagePerThreadBytes;
220+
newNumThreadsGoal = (short)Math.Min(newNumThreadsGoal, achievableNumThreadsGoal);
221+
if (newNumThreadsGoal <= numThreadsGoal)
222+
{
223+
return 0;
224+
}
225+
} while (false);
226+
227+
_numThreadsAddedDueToBlocking += (short)(newNumThreadsGoal - numThreadsGoal);
228+
_separated.numThreadsGoal = newNumThreadsGoal;
229+
HillClimbing.ThreadPoolHillClimber.ForceChange(
230+
newNumThreadsGoal,
231+
HillClimbing.StateOrTransition.CooperativeBlocking);
232+
if (counts.NumProcessingWork >= numThreadsGoal && _separated.numRequestedWorkers > 0)
233+
{
234+
addWorker = true;
235+
}
236+
237+
numThreadsGoal = newNumThreadsGoal;
238+
if (numThreadsGoal >= targetThreadsGoal)
239+
{
240+
return 0;
241+
}
242+
} while (false);
243+
244+
// Calculate how much delay to induce before another thread is created. These operations don't overflow because of
245+
// limits on max thread count and max delays.
246+
_pendingBlockingAdjustment = PendingBlockingAdjustment.WithDelayIfNecessary;
247+
int delayStepCount = 1 + (numThreadsGoal - configuredMaxThreadsWithoutDelay) / BlockingConfig.ThreadsPerDelayStep;
248+
return Math.Min((uint)delayStepCount * BlockingConfig.DelayStepMs, BlockingConfig.MaxDelayMs);
249+
}
250+
251+
private enum PendingBlockingAdjustment : byte
252+
{
253+
None,
254+
Immediately,
255+
WithDelayIfNecessary
256+
}
257+
258+
private static class BlockingConfig
259+
{
260+
public static readonly bool IsCooperativeBlockingEnabled =
261+
AppContextConfigHelper.GetBooleanConfig("System.Threading.ThreadPool.Blocking.CooperativeBlocking", true);
262+
263+
public static readonly short ThreadsToAddWithoutDelay;
264+
public static readonly short ThreadsPerDelayStep;
265+
public static readonly uint DelayStepMs;
266+
public static readonly uint MaxDelayMs;
267+
268+
#pragma warning disable CA1810 // remove the explicit static constructor
269+
static BlockingConfig()
270+
{
271+
// Summary description of how blocking compensation works and how the config settings below are used:
272+
// - After the thread count based on MinThreads is reached, up to ThreadsToAddWithoutDelay additional threads
273+
// may be created without a delay
274+
// - After that, before each additional thread is created, a delay is induced, starting with DelayStepMs
275+
// - For every ThreadsPerDelayStep threads that are added with a delay, an additional DelayStepMs is added to
276+
// the delay
277+
// - The delay may not exceed MaxDelayMs
278+
// - Delays are only induced before creating threads. If threads are already available, they would be released
279+
// without delay to compensate for cooperative blocking.
280+
// - Physical memory usage and limits are also used and beyond a threshold, the system switches to fallback mode
281+
// where threads would be created if starvation is detected, typically with higher delays
282+
283+
// After the thread count based on MinThreads is reached, this value (after it is multiplied by the processor
284+
// count) specifies how many additional threads may be created without a delay
285+
int blocking_threadsToAddWithoutDelay_procCountFactor =
286+
AppContextConfigHelper.GetInt32Config(
287+
"System.Threading.ThreadPool.Blocking.ThreadsToAddWithoutDelay_ProcCountFactor",
288+
1,
289+
false);
290+
291+
// After the thread count based on ThreadsToAddWithoutDelay is reached, this value (after it is multiplied by
292+
// the processor count) specifies after how many threads an additional DelayStepMs would be added to the delay
293+
// before each new thread is created
294+
int blocking_threadsPerDelayStep_procCountFactor =
295+
AppContextConfigHelper.GetInt32Config(
296+
"System.Threading.ThreadPool.Blocking.ThreadsPerDelayStep_ProcCountFactor",
297+
1,
298+
false);
299+
300+
// After the thread count based on ThreadsToAddWithoutDelay is reached, this value specifies how much additional
301+
// delay to add per ThreadsPerDelayStep threads, which would be applied before each new thread is created
302+
DelayStepMs =
303+
(uint)AppContextConfigHelper.GetInt32Config(
304+
"System.Threading.ThreadPool.Blocking.DelayStepMs",
305+
25,
306+
false);
307+
308+
// After the thread count based on ThreadsToAddWithoutDelay is reached, this value specifies the max delay to
309+
// use before each new thread is created
310+
MaxDelayMs =
311+
(uint)AppContextConfigHelper.GetInt32Config(
312+
"System.Threading.ThreadPool.Blocking.MaxDelayMs",
313+
250,
314+
false);
315+
316+
int processorCount = Environment.ProcessorCount;
317+
ThreadsToAddWithoutDelay = (short)(processorCount * blocking_threadsToAddWithoutDelay_procCountFactor);
318+
if (ThreadsToAddWithoutDelay > MaxPossibleThreadCount ||
319+
ThreadsToAddWithoutDelay / processorCount != blocking_threadsToAddWithoutDelay_procCountFactor)
320+
{
321+
ThreadsToAddWithoutDelay = MaxPossibleThreadCount;
322+
}
323+
324+
blocking_threadsPerDelayStep_procCountFactor = Math.Max(1, blocking_threadsPerDelayStep_procCountFactor);
325+
short maxThreadsPerDelayStep = (short)(MaxPossibleThreadCount - ThreadsToAddWithoutDelay);
326+
ThreadsPerDelayStep =
327+
(short)(processorCount * blocking_threadsPerDelayStep_procCountFactor);
328+
if (ThreadsPerDelayStep > maxThreadsPerDelayStep ||
329+
ThreadsPerDelayStep / processorCount != blocking_threadsPerDelayStep_procCountFactor)
330+
{
331+
ThreadsPerDelayStep = maxThreadsPerDelayStep;
332+
}
333+
334+
MaxDelayMs = Math.Max(1, Math.Min(MaxDelayMs, GateThread.GateActivitiesPeriodMs));
335+
DelayStepMs = Math.Max(1, Math.Min(DelayStepMs, MaxDelayMs));
336+
}
337+
#pragma warning restore CA1810
338+
}
339+
}
340+
}

0 commit comments

Comments
 (0)