Skip to content

Commit 443b5eb

Browse files
committed
HTTP/3: Add IStreamAbortFeature
1 parent dd476b1 commit 443b5eb

File tree

9 files changed

+258
-118
lines changed

9 files changed

+258
-118
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
namespace Microsoft.AspNetCore.Connections.Features
5+
{
6+
/// <summary>
7+
/// Supports aborting one side of a connection stream.
8+
/// </summary>
9+
public interface IStreamAbortFeature
10+
{
11+
/// <summary>
12+
/// Abort the read side of the connection stream.
13+
/// </summary>
14+
/// <param name="errorCode">The error code to send with the abort.</param>
15+
/// <param name="abortReason">An optional <see cref="ConnectionAbortedException"/> describing the reason to abort the read side of the connection stream.</param>
16+
void AbortRead(long errorCode, ConnectionAbortedException abortReason);
17+
18+
/// <summary>
19+
/// Abort the write side of the connection stream.
20+
/// </summary>
21+
/// <param name="errorCode">The error code to send with the abort.</param>
22+
/// <param name="abortReason">An optional <see cref="ConnectionAbortedException"/> describing the reason to abort the write side of the connection stream.</param>
23+
void AbortWrite(long errorCode, ConnectionAbortedException abortReason);
24+
}
25+
}

src/Servers/Connections.Abstractions/src/PublicAPI.Unshipped.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ Microsoft.AspNetCore.Connections.Features.IConnectionSocketFeature
44
Microsoft.AspNetCore.Connections.Features.IConnectionSocketFeature.Socket.get -> System.Net.Sockets.Socket!
55
Microsoft.AspNetCore.Connections.Features.IPersistentStateFeature
66
Microsoft.AspNetCore.Connections.Features.IPersistentStateFeature.State.get -> System.Collections.Generic.IDictionary<object!, object?>!
7+
Microsoft.AspNetCore.Connections.Features.IStreamAbortFeature
8+
Microsoft.AspNetCore.Connections.Features.IStreamAbortFeature.AbortRead(long errorCode, Microsoft.AspNetCore.Connections.ConnectionAbortedException! abortReason) -> void
9+
Microsoft.AspNetCore.Connections.Features.IStreamAbortFeature.AbortWrite(long errorCode, Microsoft.AspNetCore.Connections.ConnectionAbortedException! abortReason) -> void
710
Microsoft.AspNetCore.Connections.IConnectionListener.AcceptAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<Microsoft.AspNetCore.Connections.ConnectionContext?>
811
Microsoft.AspNetCore.Connections.IMultiplexedConnectionBuilder
912
Microsoft.AspNetCore.Connections.IMultiplexedConnectionBuilder.ApplicationServices.get -> System.IServiceProvider!

src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH
4343
private Http3StreamContext _context = default!;
4444
private IProtocolErrorCodeFeature _errorCodeFeature = default!;
4545
private IStreamIdFeature _streamIdFeature = default!;
46+
private IStreamAbortFeature _streamAbortFeature = default!;
4647
private int _isClosed;
4748
private readonly Http3RawFrame _incomingFrame = new Http3RawFrame();
4849
protected RequestHeaderParsingState _requestHeaderParsingState;
@@ -58,7 +59,6 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH
5859

5960
public bool EndStreamReceived => (_completionState & StreamCompletionFlags.EndStreamReceived) == StreamCompletionFlags.EndStreamReceived;
6061
private bool IsAborted => (_completionState & StreamCompletionFlags.Aborted) == StreamCompletionFlags.Aborted;
61-
internal bool RstStreamReceived => (_completionState & StreamCompletionFlags.RstStreamReceived) == StreamCompletionFlags.RstStreamReceived;
6262

6363
public Pipe RequestBodyPipe { get; private set; } = default!;
6464

@@ -87,6 +87,7 @@ public void Initialize(Http3StreamContext context)
8787

8888
_errorCodeFeature = _context.ConnectionFeatures.Get<IProtocolErrorCodeFeature>()!;
8989
_streamIdFeature = _context.ConnectionFeatures.Get<IStreamIdFeature>()!;
90+
_streamAbortFeature = _context.ConnectionFeatures.Get<IStreamAbortFeature>()!;
9091

9192
_appCompleted = null;
9293
_isClosed = 0;
@@ -369,18 +370,15 @@ private void CompleteStream(bool errored)
369370
Log.RequestBodyNotEntirelyRead(ConnectionIdFeature, TraceIdentifier);
370371
}
371372

372-
var (oldState, newState) = ApplyCompletionFlag(StreamCompletionFlags.Aborted);
373+
var (oldState, newState) = ApplyCompletionFlag(StreamCompletionFlags.AbortedRead);
373374
if (oldState != newState)
374375
{
375376
// https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-4.1-15
376377
// When the server does not need to receive the remainder of the request, it MAY abort reading
377378
// the request stream, send a complete response, and cleanly close the sending part of the stream.
378379
// The error code H3_NO_ERROR SHOULD be used when requesting that the client stop sending on the
379380
// request stream.
380-
381-
// TODO(JamesNK): Abort the read half of the stream with H3_NO_ERROR
382-
// https://github.com/dotnet/aspnetcore/issues/33575
383-
381+
_streamAbortFeature.AbortRead((long)Http3ErrorCode.NoError, new ConnectionAbortedException("The application completed without reading the entire request body."));
384382
RequestBodyPipe.Writer.Complete();
385383
}
386384

@@ -938,8 +936,8 @@ private enum PseudoHeaderFields
938936
private enum StreamCompletionFlags
939937
{
940938
None = 0,
941-
RstStreamReceived = 1,
942-
EndStreamReceived = 2,
939+
EndStreamReceived = 1,
940+
AbortedRead = 2,
943941
Aborted = 4,
944942
}
945943

src/Servers/Kestrel/Transport.Quic/src/Internal/IQuicTrace.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,7 @@ internal interface IQuicTrace : ILogger
2121
void StreamShutdownWrite(QuicStreamContext streamContext, string reason);
2222
void StreamAborted(QuicStreamContext streamContext, Exception ex);
2323
void StreamAbort(QuicStreamContext streamContext, string reason);
24+
void StreamAbortRead(QuicStreamContext streamContext, string reason);
25+
void StreamAbortWrite(QuicStreamContext streamContext, string reason);
2426
}
2527
}

src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs

Lines changed: 101 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ internal partial class QuicStreamContext : TransportConnection, IPooledStream
3535
private CancellationTokenSource _streamClosedTokenSource = default!;
3636
private string? _connectionId;
3737
private const int MinAllocBufferSize = 4096;
38+
private volatile Exception? _shutdownReadReason;
39+
private volatile Exception? _shutdownWriteReason;
3840
private volatile Exception? _shutdownReason;
3941
private bool _streamClosed;
4042
private bool _serverAborted;
@@ -172,7 +174,42 @@ private async Task DoReceive()
172174

173175
try
174176
{
175-
await ProcessReceives();
177+
var input = Input;
178+
while (true)
179+
{
180+
var buffer = Input.GetMemory(MinAllocBufferSize);
181+
var bytesReceived = await _stream.ReadAsync(buffer);
182+
183+
if (bytesReceived == 0)
184+
{
185+
// Read completed.
186+
break;
187+
}
188+
189+
input.Advance(bytesReceived);
190+
191+
var flushTask = input.FlushAsync();
192+
193+
var paused = !flushTask.IsCompleted;
194+
195+
if (paused)
196+
{
197+
_log.StreamPause(this);
198+
}
199+
200+
var result = await flushTask;
201+
202+
if (paused)
203+
{
204+
_log.StreamResume(this);
205+
}
206+
207+
if (result.IsCompleted || result.IsCanceled)
208+
{
209+
// Pipe consumer is shut down, do we stop writing
210+
break;
211+
}
212+
}
176213
}
177214
catch (QuicStreamAbortedException ex)
178215
{
@@ -201,54 +238,14 @@ private async Task DoReceive()
201238
finally
202239
{
203240
// If Shutdown() has already bee called, assume that was the reason ProcessReceives() exited.
204-
Input.Complete(_shutdownReason ?? error);
241+
Input.Complete(_shutdownReadReason ?? _shutdownReason ?? error);
205242

206243
FireStreamClosed();
207244

208245
await _waitForConnectionClosedTcs.Task;
209246
}
210247
}
211248

212-
private async Task ProcessReceives()
213-
{
214-
var input = Input;
215-
while (true)
216-
{
217-
var buffer = Input.GetMemory(MinAllocBufferSize);
218-
var bytesReceived = await _stream.ReadAsync(buffer);
219-
220-
if (bytesReceived == 0)
221-
{
222-
// Read completed.
223-
break;
224-
}
225-
226-
input.Advance(bytesReceived);
227-
228-
var flushTask = input.FlushAsync();
229-
230-
var paused = !flushTask.IsCompleted;
231-
232-
if (paused)
233-
{
234-
_log.StreamPause(this);
235-
}
236-
237-
var result = await flushTask;
238-
239-
if (paused)
240-
{
241-
_log.StreamResume(this);
242-
}
243-
244-
if (result.IsCompleted || result.IsCanceled)
245-
{
246-
// Pipe consumer is shut down, do we stop writing
247-
break;
248-
}
249-
}
250-
}
251-
252249
private void FireStreamClosed()
253250
{
254251
// Guard against scheduling this multiple times
@@ -288,7 +285,34 @@ private async Task DoSend()
288285

289286
try
290287
{
291-
await ProcessSends();
288+
// Resolve `output` PipeReader via the IDuplexPipe interface prior to loop start for performance.
289+
var output = Output;
290+
while (true)
291+
{
292+
var result = await output.ReadAsync();
293+
294+
if (result.IsCanceled)
295+
{
296+
break;
297+
}
298+
299+
var buffer = result.Buffer;
300+
301+
var end = buffer.End;
302+
var isCompleted = result.IsCompleted;
303+
if (!buffer.IsEmpty)
304+
{
305+
await _stream.WriteAsync(buffer, endStream: isCompleted);
306+
}
307+
308+
output.AdvanceTo(end);
309+
310+
if (isCompleted)
311+
{
312+
// Once the stream pipe is closed, shutdown the stream.
313+
break;
314+
}
315+
}
292316
}
293317
catch (QuicStreamAbortedException ex)
294318
{
@@ -326,38 +350,6 @@ private async Task DoSend()
326350
}
327351
}
328352

329-
private async Task ProcessSends()
330-
{
331-
// Resolve `output` PipeReader via the IDuplexPipe interface prior to loop start for performance.
332-
var output = Output;
333-
while (true)
334-
{
335-
var result = await output.ReadAsync();
336-
337-
if (result.IsCanceled)
338-
{
339-
break;
340-
}
341-
342-
var buffer = result.Buffer;
343-
344-
var end = buffer.End;
345-
var isCompleted = result.IsCompleted;
346-
if (!buffer.IsEmpty)
347-
{
348-
await _stream.WriteAsync(buffer, endStream: isCompleted);
349-
}
350-
351-
output.AdvanceTo(end);
352-
353-
if (isCompleted)
354-
{
355-
// Once the stream pipe is closed, shutdown the stream.
356-
break;
357-
}
358-
}
359-
}
360-
361353
public override void Abort(ConnectionAbortedException abortReason)
362354
{
363355
// This abort is called twice, make sure that doesn't happen.
@@ -387,6 +379,40 @@ public override void Abort(ConnectionAbortedException abortReason)
387379
Output.CancelPendingRead();
388380
}
389381

382+
public void AbortRead(long errorCode, ConnectionAbortedException abortReason)
383+
{
384+
lock (_shutdownLock)
385+
{
386+
if (_stream.CanRead)
387+
{
388+
_shutdownReadReason = abortReason;
389+
_log.StreamAbortRead(this, abortReason.Message);
390+
_stream.AbortRead(errorCode);
391+
}
392+
else
393+
{
394+
throw new InvalidOperationException("Unable to abort reading from a stream that doesn't support reading.");
395+
}
396+
}
397+
}
398+
399+
public void AbortWrite(long errorCode, ConnectionAbortedException abortReason)
400+
{
401+
lock (_shutdownLock)
402+
{
403+
if (_stream.CanWrite)
404+
{
405+
_shutdownWriteReason = abortReason;
406+
_log.StreamAbortWrite(this, abortReason.Message);
407+
_stream.AbortWrite(errorCode);
408+
}
409+
else
410+
{
411+
throw new InvalidOperationException("Unable to abort writing to a stream that doesn't support writing.");
412+
}
413+
}
414+
}
415+
390416
private async ValueTask ShutdownWrite(Exception? shutdownReason)
391417
{
392418
try

src/Servers/Kestrel/Transport.Quic/src/Internal/QuicTrace.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ internal class QuicTrace : IQuicTrace
3535
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(11, "StreamAborted"), @"Stream id ""{ConnectionId}"" aborted by peer.", SkipEnabledCheckLogOptions);
3636
private static readonly Action<ILogger, string, string, Exception?> _streamAbort =
3737
LoggerMessage.Define<string, string>(LogLevel.Debug, new EventId(12, "StreamAbort"), @"Stream id ""{ConnectionId}"" aborted by application because: ""{Reason}"".", SkipEnabledCheckLogOptions);
38+
private static readonly Action<ILogger, string, string, Exception?> _streamAbortRead =
39+
LoggerMessage.Define<string, string>(LogLevel.Debug, new EventId(13, "StreamAbortRead"), @"Stream id ""{ConnectionId}"" read side aborted by application because: ""{Reason}"".", SkipEnabledCheckLogOptions);
40+
private static readonly Action<ILogger, string, string, Exception?> _streamAbortWrite =
41+
LoggerMessage.Define<string, string>(LogLevel.Debug, new EventId(14, "StreamAbortWrite"), @"Stream id ""{ConnectionId}"" write side aborted by application because: ""{Reason}"".", SkipEnabledCheckLogOptions);
3842

3943
private readonly ILogger _logger;
4044

@@ -146,6 +150,22 @@ public void StreamAbort(QuicStreamContext streamContext, string reason)
146150
}
147151
}
148152

153+
public void StreamAbortRead(QuicStreamContext streamContext, string reason)
154+
{
155+
if (_logger.IsEnabled(LogLevel.Debug))
156+
{
157+
_streamAbortRead(_logger, streamContext.ConnectionId, reason, null);
158+
}
159+
}
160+
161+
public void StreamAbortWrite(QuicStreamContext streamContext, string reason)
162+
{
163+
if (_logger.IsEnabled(LogLevel.Debug))
164+
{
165+
_streamAbortWrite(_logger, streamContext.ConnectionId, reason, null);
166+
}
167+
}
168+
149169
private StreamType GetStreamType(QuicStreamContext streamContext) =>
150170
streamContext.CanRead && streamContext.CanWrite
151171
? StreamType.Bidirectional

0 commit comments

Comments
 (0)