Skip to content

Commit f5e411e

Browse files
authored
HTTP/3: Add IStreamAbortFeature (#34409)
1 parent 929c19c commit f5e411e

File tree

14 files changed

+286
-121
lines changed

14 files changed

+286
-121
lines changed

src/Servers/Connections.Abstractions/src/BaseConnectionContext.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public abstract class BaseConnectionContext : IAsyncDisposable
5353
/// <summary>
5454
/// Aborts the underlying connection.
5555
/// </summary>
56-
/// <param name="abortReason">An optional <see cref="ConnectionAbortedException"/> describing the reason the connection is being terminated.</param>
56+
/// <param name="abortReason">A <see cref="ConnectionAbortedException"/> describing the reason the connection is being terminated.</param>
5757
public abstract void Abort(ConnectionAbortedException abortReason);
5858

5959
/// <summary>

src/Servers/Connections.Abstractions/src/ConnectionContext.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public abstract class ConnectionContext : BaseConnectionContext, IAsyncDisposabl
2020
/// <summary>
2121
/// Aborts the underlying connection.
2222
/// </summary>
23-
/// <param name="abortReason">An optional <see cref="ConnectionAbortedException"/> describing the reason the connection is being terminated.</param>
23+
/// <param name="abortReason">A <see cref="ConnectionAbortedException"/> describing the reason the connection is being terminated.</param>
2424
public override void Abort(ConnectionAbortedException abortReason)
2525
{
2626
// We expect this to be overridden, but this helps maintain back compat
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
namespace Microsoft.AspNetCore.Connections.Features
5+
{
6+
/// <summary>
7+
/// Supports aborting individual sides of a connection stream.
8+
/// </summary>
9+
public interface IStreamAbortFeature
10+
{
11+
/// <summary>
12+
/// Abort the read side of the connection stream.
13+
/// </summary>
14+
/// <param name="errorCode">The error code to send with the abort.</param>
15+
/// <param name="abortReason">A <see cref="ConnectionAbortedException"/> describing the reason to abort the read side of the connection stream.</param>
16+
void AbortRead(long errorCode, ConnectionAbortedException abortReason);
17+
18+
/// <summary>
19+
/// Abort the write side of the connection stream.
20+
/// </summary>
21+
/// <param name="errorCode">The error code to send with the abort.</param>
22+
/// <param name="abortReason">A <see cref="ConnectionAbortedException"/> describing the reason to abort the write side of the connection stream.</param>
23+
void AbortWrite(long errorCode, ConnectionAbortedException abortReason);
24+
}
25+
}

src/Servers/Connections.Abstractions/src/PublicAPI.Unshipped.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ Microsoft.AspNetCore.Connections.Features.IConnectionSocketFeature
44
Microsoft.AspNetCore.Connections.Features.IConnectionSocketFeature.Socket.get -> System.Net.Sockets.Socket!
55
Microsoft.AspNetCore.Connections.Features.IPersistentStateFeature
66
Microsoft.AspNetCore.Connections.Features.IPersistentStateFeature.State.get -> System.Collections.Generic.IDictionary<object!, object?>!
7+
Microsoft.AspNetCore.Connections.Features.IStreamAbortFeature
8+
Microsoft.AspNetCore.Connections.Features.IStreamAbortFeature.AbortRead(long errorCode, Microsoft.AspNetCore.Connections.ConnectionAbortedException! abortReason) -> void
9+
Microsoft.AspNetCore.Connections.Features.IStreamAbortFeature.AbortWrite(long errorCode, Microsoft.AspNetCore.Connections.ConnectionAbortedException! abortReason) -> void
710
Microsoft.AspNetCore.Connections.IConnectionListener.AcceptAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<Microsoft.AspNetCore.Connections.ConnectionContext?>
811
Microsoft.AspNetCore.Connections.IMultiplexedConnectionBuilder
912
Microsoft.AspNetCore.Connections.IMultiplexedConnectionBuilder.ApplicationServices.get -> System.IServiceProvider!

src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH
4343
private Http3StreamContext _context = default!;
4444
private IProtocolErrorCodeFeature _errorCodeFeature = default!;
4545
private IStreamIdFeature _streamIdFeature = default!;
46+
private IStreamAbortFeature _streamAbortFeature = default!;
4647
private int _isClosed;
4748
private readonly Http3RawFrame _incomingFrame = new Http3RawFrame();
4849
protected RequestHeaderParsingState _requestHeaderParsingState;
@@ -58,7 +59,6 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH
5859

5960
public bool EndStreamReceived => (_completionState & StreamCompletionFlags.EndStreamReceived) == StreamCompletionFlags.EndStreamReceived;
6061
private bool IsAborted => (_completionState & StreamCompletionFlags.Aborted) == StreamCompletionFlags.Aborted;
61-
internal bool RstStreamReceived => (_completionState & StreamCompletionFlags.RstStreamReceived) == StreamCompletionFlags.RstStreamReceived;
6262

6363
public Pipe RequestBodyPipe { get; private set; } = default!;
6464

@@ -87,6 +87,7 @@ public void Initialize(Http3StreamContext context)
8787

8888
_errorCodeFeature = _context.ConnectionFeatures.Get<IProtocolErrorCodeFeature>()!;
8989
_streamIdFeature = _context.ConnectionFeatures.Get<IStreamIdFeature>()!;
90+
_streamAbortFeature = _context.ConnectionFeatures.Get<IStreamAbortFeature>()!;
9091

9192
_appCompleted = null;
9293
_isClosed = 0;
@@ -371,18 +372,15 @@ private void CompleteStream(bool errored)
371372
Log.RequestBodyNotEntirelyRead(ConnectionIdFeature, TraceIdentifier);
372373
}
373374

374-
var (oldState, newState) = ApplyCompletionFlag(StreamCompletionFlags.Aborted);
375+
var (oldState, newState) = ApplyCompletionFlag(StreamCompletionFlags.AbortedRead);
375376
if (oldState != newState)
376377
{
377378
// https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-4.1-15
378379
// When the server does not need to receive the remainder of the request, it MAY abort reading
379380
// the request stream, send a complete response, and cleanly close the sending part of the stream.
380381
// The error code H3_NO_ERROR SHOULD be used when requesting that the client stop sending on the
381382
// request stream.
382-
383-
// TODO(JamesNK): Abort the read half of the stream with H3_NO_ERROR
384-
// https://github.com/dotnet/aspnetcore/issues/33575
385-
383+
_streamAbortFeature.AbortRead((long)Http3ErrorCode.NoError, new ConnectionAbortedException("The application completed without reading the entire request body."));
386384
RequestBodyPipe.Writer.Complete();
387385
}
388386

@@ -940,8 +938,8 @@ private enum PseudoHeaderFields
940938
private enum StreamCompletionFlags
941939
{
942940
None = 0,
943-
RstStreamReceived = 1,
944-
EndStreamReceived = 2,
941+
EndStreamReceived = 1,
942+
AbortedRead = 2,
945943
Aborted = 4,
946944
}
947945

src/Servers/Kestrel/Transport.Quic/src/Internal/IQuicTrace.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,7 @@ internal interface IQuicTrace : ILogger
2121
void StreamShutdownWrite(QuicStreamContext streamContext, string reason);
2222
void StreamAborted(QuicStreamContext streamContext, Exception ex);
2323
void StreamAbort(QuicStreamContext streamContext, string reason);
24+
void StreamAbortRead(QuicStreamContext streamContext, string reason);
25+
void StreamAbortWrite(QuicStreamContext streamContext, string reason);
2426
}
2527
}

src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.FeatureCollection.cs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal
99
{
10-
internal sealed partial class QuicStreamContext : IPersistentStateFeature, IStreamDirectionFeature, IProtocolErrorCodeFeature, IStreamIdFeature
10+
internal sealed partial class QuicStreamContext : IPersistentStateFeature, IStreamDirectionFeature, IProtocolErrorCodeFeature, IStreamIdFeature, IStreamAbortFeature
1111
{
1212
private IDictionary<object, object?>? _persistentState;
1313

@@ -27,12 +27,47 @@ internal sealed partial class QuicStreamContext : IPersistentStateFeature, IStre
2727
}
2828
}
2929

30+
public void AbortRead(long errorCode, ConnectionAbortedException abortReason)
31+
{
32+
lock (_shutdownLock)
33+
{
34+
if (_stream.CanRead)
35+
{
36+
_shutdownReadReason = abortReason;
37+
_log.StreamAbortRead(this, abortReason.Message);
38+
_stream.AbortRead(errorCode);
39+
}
40+
else
41+
{
42+
throw new InvalidOperationException("Unable to abort reading from a stream that doesn't support reading.");
43+
}
44+
}
45+
}
46+
47+
public void AbortWrite(long errorCode, ConnectionAbortedException abortReason)
48+
{
49+
lock (_shutdownLock)
50+
{
51+
if (_stream.CanWrite)
52+
{
53+
_shutdownWriteReason = abortReason;
54+
_log.StreamAbortWrite(this, abortReason.Message);
55+
_stream.AbortWrite(errorCode);
56+
}
57+
else
58+
{
59+
throw new InvalidOperationException("Unable to abort writing to a stream that doesn't support writing.");
60+
}
61+
}
62+
}
63+
3064
private void InitializeFeatures()
3165
{
3266
_currentIPersistentStateFeature = this;
3367
_currentIStreamDirectionFeature = this;
3468
_currentIProtocolErrorCodeFeature = this;
3569
_currentIStreamIdFeature = this;
70+
_currentIStreamAbortFeature = this;
3671
_currentITlsConnectionFeature = _connection._currentITlsConnectionFeature;
3772
}
3873
}

src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs

Lines changed: 67 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ internal partial class QuicStreamContext : TransportConnection, IPooledStream
3535
private CancellationTokenSource _streamClosedTokenSource = default!;
3636
private string? _connectionId;
3737
private const int MinAllocBufferSize = 4096;
38+
private volatile Exception? _shutdownReadReason;
39+
private volatile Exception? _shutdownWriteReason;
3840
private volatile Exception? _shutdownReason;
3941
private bool _streamClosed;
4042
private bool _serverAborted;
@@ -175,7 +177,42 @@ private async Task DoReceive()
175177

176178
try
177179
{
178-
await ProcessReceives();
180+
var input = Input;
181+
while (true)
182+
{
183+
var buffer = Input.GetMemory(MinAllocBufferSize);
184+
var bytesReceived = await _stream.ReadAsync(buffer);
185+
186+
if (bytesReceived == 0)
187+
{
188+
// Read completed.
189+
break;
190+
}
191+
192+
input.Advance(bytesReceived);
193+
194+
var flushTask = input.FlushAsync();
195+
196+
var paused = !flushTask.IsCompleted;
197+
198+
if (paused)
199+
{
200+
_log.StreamPause(this);
201+
}
202+
203+
var result = await flushTask;
204+
205+
if (paused)
206+
{
207+
_log.StreamResume(this);
208+
}
209+
210+
if (result.IsCompleted || result.IsCanceled)
211+
{
212+
// Pipe consumer is shut down, do we stop writing
213+
break;
214+
}
215+
}
179216
}
180217
catch (QuicStreamAbortedException ex)
181218
{
@@ -204,54 +241,14 @@ private async Task DoReceive()
204241
finally
205242
{
206243
// If Shutdown() has already bee called, assume that was the reason ProcessReceives() exited.
207-
Input.Complete(_shutdownReason ?? error);
244+
Input.Complete(_shutdownReadReason ?? _shutdownReason ?? error);
208245

209246
FireStreamClosed();
210247

211248
await _waitForConnectionClosedTcs.Task;
212249
}
213250
}
214251

215-
private async Task ProcessReceives()
216-
{
217-
var input = Input;
218-
while (true)
219-
{
220-
var buffer = Input.GetMemory(MinAllocBufferSize);
221-
var bytesReceived = await _stream.ReadAsync(buffer);
222-
223-
if (bytesReceived == 0)
224-
{
225-
// Read completed.
226-
break;
227-
}
228-
229-
input.Advance(bytesReceived);
230-
231-
var flushTask = input.FlushAsync();
232-
233-
var paused = !flushTask.IsCompleted;
234-
235-
if (paused)
236-
{
237-
_log.StreamPause(this);
238-
}
239-
240-
var result = await flushTask;
241-
242-
if (paused)
243-
{
244-
_log.StreamResume(this);
245-
}
246-
247-
if (result.IsCompleted || result.IsCanceled)
248-
{
249-
// Pipe consumer is shut down, do we stop writing
250-
break;
251-
}
252-
}
253-
}
254-
255252
private void FireStreamClosed()
256253
{
257254
// Guard against scheduling this multiple times
@@ -291,7 +288,34 @@ private async Task DoSend()
291288

292289
try
293290
{
294-
await ProcessSends();
291+
// Resolve `output` PipeReader via the IDuplexPipe interface prior to loop start for performance.
292+
var output = Output;
293+
while (true)
294+
{
295+
var result = await output.ReadAsync();
296+
297+
if (result.IsCanceled)
298+
{
299+
break;
300+
}
301+
302+
var buffer = result.Buffer;
303+
304+
var end = buffer.End;
305+
var isCompleted = result.IsCompleted;
306+
if (!buffer.IsEmpty)
307+
{
308+
await _stream.WriteAsync(buffer, endStream: isCompleted);
309+
}
310+
311+
output.AdvanceTo(end);
312+
313+
if (isCompleted)
314+
{
315+
// Once the stream pipe is closed, shutdown the stream.
316+
break;
317+
}
318+
}
295319
}
296320
catch (QuicStreamAbortedException ex)
297321
{
@@ -329,38 +353,6 @@ private async Task DoSend()
329353
}
330354
}
331355

332-
private async Task ProcessSends()
333-
{
334-
// Resolve `output` PipeReader via the IDuplexPipe interface prior to loop start for performance.
335-
var output = Output;
336-
while (true)
337-
{
338-
var result = await output.ReadAsync();
339-
340-
if (result.IsCanceled)
341-
{
342-
break;
343-
}
344-
345-
var buffer = result.Buffer;
346-
347-
var end = buffer.End;
348-
var isCompleted = result.IsCompleted;
349-
if (!buffer.IsEmpty)
350-
{
351-
await _stream.WriteAsync(buffer, endStream: isCompleted);
352-
}
353-
354-
output.AdvanceTo(end);
355-
356-
if (isCompleted)
357-
{
358-
// Once the stream pipe is closed, shutdown the stream.
359-
break;
360-
}
361-
}
362-
}
363-
364356
public override void Abort(ConnectionAbortedException abortReason)
365357
{
366358
// This abort is called twice, make sure that doesn't happen.

src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,28 @@ public void StreamAbort(QuicStreamContext streamContext, string reason)
154154
}
155155
}
156156

157+
[LoggerMessage(13, LogLevel.Debug, @"Stream id ""{ConnectionId}"" read side aborted by application because: ""{Reason}"".", SkipEnabledCheck = true)]
158+
private static partial void StreamAbortRead(ILogger logger, string connectionId, string reason);
159+
160+
public void StreamAbortRead(QuicStreamContext streamContext, string reason)
161+
{
162+
if (_logger.IsEnabled(LogLevel.Debug))
163+
{
164+
StreamAbortRead(_logger, streamContext.ConnectionId, reason);
165+
}
166+
}
167+
168+
[LoggerMessage(14, LogLevel.Debug, @"Stream id ""{ConnectionId}"" write side aborted by application because: ""{Reason}"".", SkipEnabledCheck = true)]
169+
private static partial void StreamAbortWrite(ILogger logger, string connectionId, string reason);
170+
171+
public void StreamAbortWrite(QuicStreamContext streamContext, string reason)
172+
{
173+
if (_logger.IsEnabled(LogLevel.Debug))
174+
{
175+
StreamAbortWrite(_logger, streamContext.ConnectionId, reason);
176+
}
177+
}
178+
157179
private static StreamType GetStreamType(QuicStreamContext streamContext) =>
158180
streamContext.CanRead && streamContext.CanWrite
159181
? StreamType.Bidirectional

0 commit comments

Comments
 (0)