Skip to content

Commit 83237c9

Browse files
authored
Multiple accept loops in named pipes transport (#46259)
1 parent 6a4cb64 commit 83237c9

File tree

4 files changed

+90
-8
lines changed

4 files changed

+90
-8
lines changed

src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener
2626
private readonly PipeOptions _inputOptions;
2727
private readonly PipeOptions _outputOptions;
2828
private readonly Mutex _mutex;
29-
private Task? _listeningTask;
29+
private Task[]? _listeningTasks;
3030
private int _disposed;
3131

3232
public NamedPipeConnectionListener(
@@ -45,7 +45,7 @@ public NamedPipeConnectionListener(
4545
// The OS maintains a backlog of clients that are waiting to connect, so the app queue only stores a single connection.
4646
// We want to have a queue plus a background task that populates the queue, rather than creating NamedPipeServerStream
4747
// when AcceptAsync is called, so that the server is always the owner of the pipe name.
48-
_acceptedQueue = Channel.CreateBounded<ConnectionContext>(new BoundedChannelOptions(capacity: 1) { SingleWriter = true });
48+
_acceptedQueue = Channel.CreateBounded<ConnectionContext>(new BoundedChannelOptions(capacity: 1));
4949

5050
var maxReadBufferSize = _options.MaxReadBufferSize ?? 0;
5151
var maxWriteBufferSize = _options.MaxWriteBufferSize ?? 0;
@@ -56,12 +56,17 @@ public NamedPipeConnectionListener(
5656

5757
public void Start()
5858
{
59-
Debug.Assert(_listeningTask == null, "Already started");
59+
Debug.Assert(_listeningTasks == null, "Already started");
6060

61-
// Start first stream inline to catch creation errors.
62-
var initialStream = CreateServerStream();
61+
_listeningTasks = new Task[_options.ListenerQueueCount];
6362

64-
_listeningTask = StartAsync(initialStream);
63+
for (var i = 0; i < _listeningTasks.Length; i++)
64+
{
65+
// Start first stream inline to catch creation errors.
66+
var initialStream = CreateServerStream();
67+
68+
_listeningTasks[i] = Task.Run(() => StartAsync(initialStream));
69+
}
6570
}
6671

6772
public EndPoint EndPoint => _endpoint;
@@ -182,9 +187,9 @@ public async ValueTask DisposeAsync()
182187

183188
_listeningTokenSource.Dispose();
184189
_mutex.Dispose();
185-
if (_listeningTask != null)
190+
if (_listeningTasks != null)
186191
{
187-
await _listeningTask;
192+
await Task.WhenAll(_listeningTasks);
188193
}
189194
}
190195
}

src/Servers/Kestrel/Transport.NamedPipes/src/NamedPipeTransportOptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,14 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes;
1111
/// </summary>
1212
public sealed class NamedPipeTransportOptions
1313
{
14+
/// <summary>
15+
/// The number of listener queues used to accept name pipe connections.
16+
/// </summary>
17+
/// <remarks>
18+
/// Defaults to <see cref="Environment.ProcessorCount" /> rounded down and clamped between 1 and 16.
19+
/// </remarks>
20+
public int ListenerQueueCount { get; set; } = Math.Min(Environment.ProcessorCount, 16);
21+
1422
/// <summary>
1523
/// Gets or sets the maximum unconsumed incoming bytes the transport will buffer.
1624
/// <para>

src/Servers/Kestrel/Transport.NamedPipes/src/PublicAPI.Unshipped.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ Microsoft.AspNetCore.Hosting.WebHostBuilderNamedPipeExtensions
33
Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions
44
Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.CurrentUserOnly.get -> bool
55
Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.CurrentUserOnly.set -> void
6+
Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.ListenerQueueCount.get -> int
7+
Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.ListenerQueueCount.set -> void
68
Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.MaxReadBufferSize.get -> long?
79
Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.MaxReadBufferSize.set -> void
810
Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.NamedPipeTransportOptions.MaxWriteBufferSize.get -> long?

src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,73 @@ public async Task AcceptAsync_UnbindAfterCall_CleanExitAndLog()
6868
Assert.Contains(LogMessages, m => m.EventId.Name == "ConnectionListenerAborted");
6969
}
7070

71+
[Theory]
72+
[InlineData(1)]
73+
[InlineData(4)]
74+
[InlineData(16)]
75+
public async Task AcceptAsync_ParallelConnections_ClientConnectionsSuccessfullyAccepted(int listenerQueueCount)
76+
{
77+
// Arrange
78+
const int ParallelCount = 10;
79+
const int ParallelCallCount = 250;
80+
const int TotalCallCount = ParallelCount * ParallelCallCount;
81+
82+
var currentCallCount = 0;
83+
var options = new NamedPipeTransportOptions();
84+
options.ListenerQueueCount = listenerQueueCount;
85+
await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory, options: options);
86+
87+
// Act
88+
var serverTask = Task.Run(async () =>
89+
{
90+
while (currentCallCount < TotalCallCount)
91+
{
92+
_ = await connectionListener.AcceptAsync();
93+
94+
currentCallCount++;
95+
96+
Logger.LogInformation($"Server accepted {currentCallCount} calls.");
97+
}
98+
99+
Logger.LogInformation($"Server task complete.");
100+
});
101+
102+
var cts = new CancellationTokenSource();
103+
var parallelTasks = new List<Task>();
104+
for (var i = 0; i < ParallelCount; i++)
105+
{
106+
parallelTasks.Add(Task.Run(async () =>
107+
{
108+
var clientStreamCount = 0;
109+
while (clientStreamCount < ParallelCallCount)
110+
{
111+
try
112+
{
113+
var clientStream = NamedPipeTestHelpers.CreateClientStream(connectionListener.EndPoint);
114+
await clientStream.ConnectAsync(cts.Token);
115+
116+
await clientStream.WriteAsync(new byte[1], cts.Token);
117+
await clientStream.DisposeAsync();
118+
clientStreamCount++;
119+
}
120+
catch (IOException ex)
121+
{
122+
Logger.LogInformation(ex, "Client exception.");
123+
}
124+
catch (OperationCanceledException)
125+
{
126+
break;
127+
}
128+
}
129+
}));
130+
}
131+
132+
await serverTask.DefaultTimeout();
133+
134+
cts.Cancel();
135+
await Task.WhenAll(parallelTasks).DefaultTimeout();
136+
}
137+
71138
[ConditionalFact]
72139
[OSSkipCondition(OperatingSystems.Linux | OperatingSystems.MacOSX, SkipReason = "Non-OS implementations use UDS with an unlimited accept limit.")]
73140
public async Task AcceptAsync_HitBacklogLimit_ClientConnectionsSuccessfullyAccepted()

0 commit comments

Comments
 (0)