Skip to content

Commit 3d2f172

Browse files
authored
Reduce thread consumption for sync HTTP/2 reads (#32917)
The response Stream returned from SocketsHttpHandler.SendAsync supports all of the various forms of read operations, including synchronous reads. For HTTP/1.x, we're able to implement these as sync all the way down to synchronous calls on the underlying Socket, as every request/response owns its own connection (at least for the duration of the operation; the connection may be returned to the pool for subsequent reuse after). For HTTP/2, however, a single connection is shared across all requests multiplexed onto it, and as such, reads on those individual responses need to coordinate, which can mean waiting one's turn. Such waiting for asynchronous operations is implemented via async/await and a ManualResetValueTaskSource-based implementation. For the sync operations, though, they're implemented as simply blocking waiting on the async equivalent, aka sync-over-async. This not only blocks the calling thread, but because the MRVTS-based implementation specifies RunContinuationsAsynchronously==true (which is important for reliability with how the async code paths are structured) it means that, in order to unblock this waiter, another thread pool thread needs to be available to run the small continuation that will in turn unblock the sync primitive being blocked on. This PR tweaks the use of MRVTS so that when we perform a synchronous read, we set MRVTS.RunContinuationsAsynchronously to false. Since the only action that will be taken as part of the continuation is to set a sync primitive, we don't need to worry about arbitrary code running as part of the completing call stack. This in turn means that in the common case, sync reads will still end up blocking the calling thread _but_ won't require an additional thread pool thread to unblock it, other than for the task running the main connection loop.
1 parent bf03034 commit 3d2f172

File tree

1 file changed

+87
-31
lines changed
  • src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler

1 file changed

+87
-31
lines changed

src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Stream.cs

Lines changed: 87 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -836,7 +836,7 @@ public async Task ReadResponseHeadersAsync(CancellationToken cancellationToken)
836836
(wait, emptyResponse) = TryEnsureHeaders();
837837
if (wait)
838838
{
839-
await GetWaiterTask(cancellationToken).ConfigureAwait(false);
839+
await WaitForDataAsync(cancellationToken).ConfigureAwait(false);
840840

841841
(wait, emptyResponse) = TryEnsureHeaders();
842842
Debug.Assert(!wait);
@@ -893,7 +893,7 @@ private void ExtendWindow(int amount)
893893
_connection.LogExceptions(_connection.SendWindowUpdateAsync(_streamId, windowUpdateSize));
894894
}
895895

896-
private (bool wait, int bytesRead) TryReadFromBuffer(Span<byte> buffer)
896+
private (bool wait, int bytesRead) TryReadFromBuffer(Span<byte> buffer, bool partOfSyncRead = false)
897897
{
898898
Debug.Assert(buffer.Length > 0);
899899

@@ -920,6 +920,7 @@ private void ExtendWindow(int amount)
920920
Debug.Assert(!_hasWaiter);
921921
_hasWaiter = true;
922922
_waitSource.Reset();
923+
_waitSource.RunContinuationsAsynchronously = !partOfSyncRead;
923924
return (true, 0);
924925
}
925926
}
@@ -931,13 +932,13 @@ public int ReadData(Span<byte> buffer, HttpResponseMessage responseMessage)
931932
return 0;
932933
}
933934

934-
(bool wait, int bytesRead) = TryReadFromBuffer(buffer);
935+
(bool wait, int bytesRead) = TryReadFromBuffer(buffer, partOfSyncRead: true);
935936
if (wait)
936937
{
937938
// Synchronously block waiting for data to be produced.
938939
Debug.Assert(bytesRead == 0);
939-
GetWaiterTask(default).AsTask().GetAwaiter().GetResult();
940-
(wait, bytesRead) = TryReadFromBuffer(buffer);
940+
WaitForData();
941+
(wait, bytesRead) = TryReadFromBuffer(buffer, partOfSyncRead: true);
941942
Debug.Assert(!wait);
942943
}
943944

@@ -965,7 +966,7 @@ public async ValueTask<int> ReadDataAsync(Memory<byte> buffer, HttpResponseMessa
965966
if (wait)
966967
{
967968
Debug.Assert(bytesRead == 0);
968-
await GetWaiterTask(cancellationToken).ConfigureAwait(false);
969+
await WaitForDataAsync(cancellationToken).ConfigureAwait(false);
969970
(wait, bytesRead) = TryReadFromBuffer(buffer.Span);
970971
Debug.Assert(!wait);
971972
}
@@ -983,6 +984,42 @@ public async ValueTask<int> ReadDataAsync(Memory<byte> buffer, HttpResponseMessa
983984
return bytesRead;
984985
}
985986

987+
public void CopyTo(HttpResponseMessage responseMessage, Stream destination, int bufferSize)
988+
{
989+
byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
990+
try
991+
{
992+
// Generallly the same logic as in ReadData, but wrapped in a loop where every read segment is written to the destination.
993+
while (true)
994+
{
995+
(bool wait, int bytesRead) = TryReadFromBuffer(buffer, partOfSyncRead: true);
996+
if (wait)
997+
{
998+
Debug.Assert(bytesRead == 0);
999+
WaitForData();
1000+
(wait, bytesRead) = TryReadFromBuffer(buffer, partOfSyncRead: true);
1001+
Debug.Assert(!wait);
1002+
}
1003+
1004+
if (bytesRead != 0)
1005+
{
1006+
ExtendWindow(bytesRead);
1007+
destination.Write(new ReadOnlySpan<byte>(buffer, 0, bytesRead));
1008+
}
1009+
else
1010+
{
1011+
// We've hit EOF. Pull in from the Http2Stream any trailers that were temporarily stored there.
1012+
CopyTrailersToResponseMessage(responseMessage);
1013+
return;
1014+
}
1015+
}
1016+
}
1017+
finally
1018+
{
1019+
ArrayPool<byte>.Shared.Return(buffer);
1020+
}
1021+
}
1022+
9861023
public async Task CopyToAsync(HttpResponseMessage responseMessage, Stream destination, int bufferSize, CancellationToken cancellationToken)
9871024
{
9881025
byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
@@ -995,7 +1032,7 @@ public async Task CopyToAsync(HttpResponseMessage responseMessage, Stream destin
9951032
if (wait)
9961033
{
9971034
Debug.Assert(bytesRead == 0);
998-
await GetWaiterTask(cancellationToken).ConfigureAwait(false);
1035+
await WaitForDataAsync(cancellationToken).ConfigureAwait(false);
9991036
(wait, bytesRead) = TryReadFromBuffer(buffer);
10001037
Debug.Assert(!wait);
10011038
}
@@ -1122,13 +1159,27 @@ private void CloseResponseBody()
11221159
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _waitSource.GetStatus(token);
11231160
void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _waitSource.OnCompleted(continuation, state, token, flags);
11241161
void IValueTaskSource.GetResult(short token) => _waitSource.GetResult(token);
1125-
private ValueTask GetWaiterTask(CancellationToken cancellationToken)
1162+
1163+
private void WaitForData()
11261164
{
1165+
// See comments in WaitAsync.
1166+
Debug.Assert(!_waitSource.RunContinuationsAsynchronously);
1167+
new ValueTask(this, _waitSource.Version).AsTask().GetAwaiter().GetResult();
1168+
}
1169+
1170+
private ValueTask WaitForDataAsync(CancellationToken cancellationToken)
1171+
{
1172+
Debug.Assert(_waitSource.RunContinuationsAsynchronously);
1173+
11271174
// No locking is required here to access _waitSource. To be here, we've already updated _hasWaiter (while holding the lock)
11281175
// to indicate that we would be creating this waiter, and at that point the only code that could be await'ing _waitSource or
11291176
// Reset'ing it is this code here. It's possible for this to race with the _waitSource being completed, but that's ok and is
11301177
// handled by _waitSource as one of its primary purposes. We can't assert _hasWaiter here, though, as once we released the
11311178
// lock, a producer could have seen _hasWaiter as true and both set it to false and signaled _waitSource.
1179+
if (!cancellationToken.CanBeCanceled)
1180+
{
1181+
return new ValueTask(this, _waitSource.Version);
1182+
}
11321183

11331184
// With HttpClient, the supplied cancellation token will always be cancelable, as HttpClient supplies a token that
11341185
// will have cancellation requested if CancelPendingRequests is called (or when a non-infinite Timeout expires).
@@ -1137,36 +1188,34 @@ private ValueTask GetWaiterTask(CancellationToken cancellationToken)
11371188
// this pay-for-play: if the token isn't cancelable, return a ValueTask wrapping this object directly, and only
11381189
// if it is cancelable, then register for the cancellation callback, allocate a task for the asynchronously
11391190
// completing case, etc.
1140-
return cancellationToken.CanBeCanceled ?
1141-
GetCancelableWaiterTask(cancellationToken) :
1142-
new ValueTask(this, _waitSource.Version);
1143-
}
1191+
return GetCancelableWaiterTask(cancellationToken);
11441192

1145-
private async ValueTask GetCancelableWaiterTask(CancellationToken cancellationToken)
1146-
{
1147-
using (cancellationToken.UnsafeRegister(s =>
1193+
async ValueTask GetCancelableWaiterTask(CancellationToken cancellationToken)
11481194
{
1149-
var thisRef = (Http2Stream)s;
1150-
1151-
bool signalWaiter;
1152-
Debug.Assert(!Monitor.IsEntered(thisRef.SyncObject));
1153-
lock (thisRef.SyncObject)
1195+
using (cancellationToken.UnsafeRegister(s =>
11541196
{
1155-
signalWaiter = thisRef._hasWaiter;
1156-
thisRef._hasWaiter = false;
1157-
}
1197+
var thisRef = (Http2Stream)s;
11581198

1159-
if (signalWaiter)
1199+
bool signalWaiter;
1200+
Debug.Assert(!Monitor.IsEntered(thisRef.SyncObject));
1201+
lock (thisRef.SyncObject)
1202+
{
1203+
signalWaiter = thisRef._hasWaiter;
1204+
thisRef._hasWaiter = false;
1205+
}
1206+
1207+
if (signalWaiter)
1208+
{
1209+
// Wake up the wait. It will then immediately check whether cancellation was requested and throw if it was.
1210+
thisRef._waitSource.SetResult(true);
1211+
}
1212+
}, this))
11601213
{
1161-
// Wake up the wait. It will then immediately check whether cancellation was requested and throw if it was.
1162-
thisRef._waitSource.SetResult(true);
1214+
await new ValueTask(this, _waitSource.Version).ConfigureAwait(false);
11631215
}
1164-
}, this))
1165-
{
1166-
await new ValueTask(this, _waitSource.Version).ConfigureAwait(false);
1167-
}
11681216

1169-
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
1217+
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
1218+
}
11701219
}
11711220

11721221
public void Trace(string message, [CallerMemberName] string memberName = null) =>
@@ -1262,6 +1311,13 @@ public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationT
12621311
return http2Stream.ReadDataAsync(destination, _responseMessage, cancellationToken);
12631312
}
12641313

1314+
public override void CopyTo(Stream destination, int bufferSize)
1315+
{
1316+
StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
1317+
Http2Stream http2Stream = _http2Stream ?? throw ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(nameof(Http2ReadStream)));
1318+
http2Stream.CopyTo(_responseMessage, destination, bufferSize);
1319+
}
1320+
12651321
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
12661322
{
12671323
StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);

0 commit comments

Comments
 (0)