Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@
Link="Common\System\Net\SocketProtocolSupportPal.Unix" />
<Compile Include="$(CommonPath)System\Net\Sockets\SocketErrorPal.Unix.cs"
Link="Common\System\Net\Sockets\SocketErrorPal.Unix" />
<Compile Include="$(CommonPath)System\Runtime\CompilerServices\PreserveDependencyAttribute.cs"
Link="Common\System\Runtime\CompilerServices\PreserveDependencyAttribute.cs" />
<Compile Include="$(CommonPath)System\Threading\Tasks\TaskToApm.cs"
Link="Common\System\Threading\Tasks\TaskToApm.cs" />
<Compile Include="$(CommonPath)Interop\Unix\Interop.Errors.cs"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public partial class SafeSocketHandle
internal bool LastConnectFailed { get; set; }
internal bool DualMode { get; set; }
internal bool ExposedHandleOrUntrackedConfiguration { get; private set; }
internal bool PreferInlineCompletions { get; set; } = SocketAsyncEngine.InlineSocketCompletionsEnabled;

internal void RegisterConnectResult(SocketError error)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ public SocketInformation DuplicateAndClose(int targetProcessId)
throw new PlatformNotSupportedException(SR.net_sockets_duplicateandclose_notsupported);
}

internal bool PreferInlineCompletions
{
get => _handle.PreferInlineCompletions;
set => _handle.PreferInlineCompletions = value;
}

partial void ValidateForMultiConnect(bool isMultiEndpoint)
{
// ValidateForMultiConnect is called before any {Begin}Connect{Async} call,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation
}
}

public AsyncOperation? ProcessSyncEventOrGetAsyncEvent(SocketAsyncContext context, bool skipAsyncEvents = false)
public AsyncOperation? ProcessSyncEventOrGetAsyncEvent(SocketAsyncContext context, bool skipAsyncEvents = false, bool processAsyncEvents = true)
{
AsyncOperation op;
using (Lock())
Expand All @@ -864,6 +864,7 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation
Debug.Assert(_isNextOperationSynchronous == (op.Event != null));
if (skipAsyncEvents && !_isNextOperationSynchronous)
{
Debug.Assert(!processAsyncEvents);
// Return the operation to indicate that the async operation was not processed, without making
// any state changes because async operations are being skipped
return op;
Expand Down Expand Up @@ -901,6 +902,11 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation
{
// Async operation. The caller will figure out how to process the IO.
Debug.Assert(!skipAsyncEvents);
if (processAsyncEvents)
{
op.Process();
return null;
}
return op;
}
}
Expand Down Expand Up @@ -1190,6 +1196,14 @@ public SocketAsyncContext(SafeSocketHandle socket)
_sendQueue.Init();
}

public bool PreferInlineCompletions
{
// Socket.PreferInlineCompletions is an experimental API with internal access modifier.
// PreserveDependency ensures the setter is available externally using reflection.
[PreserveDependency("set_PreferInlineCompletions", "System.Net.Sockets.Socket")]
get => _socket.PreferInlineCompletions;
}

private void Register()
{
Debug.Assert(_nonBlockingSet);
Expand Down Expand Up @@ -2051,15 +2065,33 @@ public Interop.Sys.SocketEvents HandleSyncEventsSpeculatively(Interop.Sys.Socket
return events;
}

public unsafe void HandleEvents(Interop.Sys.SocketEvents events)
// Called on the epoll thread.
public void HandleEventsInline(Interop.Sys.SocketEvents events)
{
if ((events & Interop.Sys.SocketEvents.Error) != 0)
{
// Set the Read and Write flags as well; the processing for these events
// Set the Read and Write flags; the processing for these events
// will pick up the error.
events ^= Interop.Sys.SocketEvents.Error;
events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write;
}

if ((events & Interop.Sys.SocketEvents.Read) != 0)
{
_receiveQueue.ProcessSyncEventOrGetAsyncEvent(this, processAsyncEvents: true);
}

if ((events & Interop.Sys.SocketEvents.Write) != 0)
{
_sendQueue.ProcessSyncEventOrGetAsyncEvent(this, processAsyncEvents: true);
}
}

// Called on ThreadPool thread.
public unsafe void HandleEvents(Interop.Sys.SocketEvents events)
{
Debug.Assert((events & Interop.Sys.SocketEvents.Error) == 0);

AsyncOperation? receiveOperation =
(events & Interop.Sys.SocketEvents.Read) != 0 ? _receiveQueue.ProcessSyncEventOrGetAsyncEvent(this) : null;
AsyncOperation? sendOperation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ internal sealed unsafe class SocketAsyncEngine : IThreadPoolWorkItem
1024;
#endif

// Socket continuations are dispatched to the ThreadPool from the event thread.
// This avoids continuations blocking the event handling.
// Setting PreferInlineCompletions allows continuations to run directly on the event thread.
// PreferInlineCompletions defaults to false and can be set to true using the DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS envvar.
internal static readonly bool InlineSocketCompletionsEnabled = Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS") == "1";

private static int GetEngineCount()
{
// The responsibility of SocketAsyncEngine is to get notifications from epoll|kqueue
Expand All @@ -38,6 +44,12 @@ private static int GetEngineCount()
return (int)count;
}

// When inlining continuations, we default to ProcessorCount to make sure event threads cannot be a bottleneck.
if (InlineSocketCompletionsEnabled)
{
return Environment.ProcessorCount;
}

Architecture architecture = RuntimeInformation.ProcessArchitecture;
int coresPerEngine = architecture == Architecture.Arm64 || architecture == Architecture.Arm
? 8
Expand Down Expand Up @@ -195,17 +207,25 @@ private void EventLoop()

if (handleToContextMap.TryGetValue(handle, out SocketAsyncContextWrapper contextWrapper) && (context = contextWrapper.Context) != null)
{
Interop.Sys.SocketEvents events = context.HandleSyncEventsSpeculatively(socketEvent.Events);
if (events != Interop.Sys.SocketEvents.None)
if (context.PreferInlineCompletions)
{
context.HandleEventsInline(socketEvent.Events);
}
else
{
var ev = new SocketIOEvent(context, events);
eventQueue.Enqueue(ev);
enqueuedEvent = true;

// This is necessary when the JIT generates unoptimized code (debug builds, live debugging,
// quick JIT, etc.) to ensure that the context does not remain referenced by this method, as
// such code may keep the stack location live for longer than necessary
ev = default;
Interop.Sys.SocketEvents events = context.HandleSyncEventsSpeculatively(socketEvent.Events);

if (events != Interop.Sys.SocketEvents.None)
{
var ev = new SocketIOEvent(context, events);
eventQueue.Enqueue(ev);
enqueuedEvent = true;

// This is necessary when the JIT generates unoptimized code (debug builds, live debugging,
// quick JIT, etc.) to ensure that the context does not remain referenced by this method, as
// such code may keep the stack location live for longer than necessary
ev = default;
}
}

// This is necessary when the JIT generates unoptimized code (debug builds, live debugging,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.IO;
using System.Diagnostics;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
using Microsoft.DotNet.RemoteExecutor;

namespace System.Net.Sockets.Tests
{
public class InlineContinuations
{
[OuterLoop]
[Fact]
[PlatformSpecific(TestPlatforms.AnyUnix)] // Inline Socket mode is specific to Unix Socket implementation.
public void InlineSocketContinuations()
{
RemoteInvokeOptions options = new RemoteInvokeOptions();
options.StartInfo.EnvironmentVariables.Add("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS", "1");
options.TimeOut = (int)TimeSpan.FromMinutes(20).TotalMilliseconds;

RemoteExecutor.Invoke(async () =>
{
// Connect/Accept tests
await new AcceptEap(null).Accept_ConcurrentAcceptsBeforeConnects_Success(5);
await new AcceptEap(null).Accept_ConcurrentAcceptsAfterConnects_Success(5);

// Send/Receive tests
await new SendReceiveEap(null).SendRecv_Stream_TCP(IPAddress.Loopback, useMultipleBuffers: false);
await new SendReceiveEap(null).SendRecv_Stream_TCP_MultipleConcurrentReceives(IPAddress.Loopback, useMultipleBuffers: false);
await new SendReceiveEap(null).SendRecv_Stream_TCP_MultipleConcurrentSends(IPAddress.Loopback, useMultipleBuffers: false);
await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: true);
await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: false);
}, options).Dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1764,6 +1764,7 @@ public async Task DisposedSocket_ThrowsOperationCanceledException()
public async Task BlockingAsyncContinuations_OperationsStillCompleteSuccessfully()
{
if (UsesSync) return;
if (Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS") == "1") return;

using (var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
using (var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<Compile Include="DnsEndPointTest.cs" />
<Compile Include="DualModeSocketTest.cs" />
<Compile Include="ExecutionContextFlowTest.cs" />
<Compile Include="InlineCompletions.Unix.cs" Condition="'$(TargetsUnix)' == 'true'"/>
<Compile Include="IPPacketInformationTest.cs" />
<Compile Include="KeepAliveTest.cs" />
<Compile Include="LingerStateTest.cs" />
Expand Down