Skip to content

Commit 16cff65

Browse files
authored
Preserve OCE.CancellationToken in ReadAtLeastAsync of StreamPipeReader (#83926)
1 parent eeb49c4 commit 16cff65

File tree

2 files changed

+32
-98
lines changed

2 files changed

+32
-98
lines changed

src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs

Lines changed: 19 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -200,94 +200,15 @@ private bool CompleteAndGetNeedsDispose()
200200
/// <inheritdoc />
201201
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
202202
{
203-
// TODO ReadyAsync needs to throw if there are overlapping reads.
204-
ThrowIfCompleted();
205-
206-
if (cancellationToken.IsCancellationRequested)
207-
{
208-
return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(cancellationToken));
209-
}
210-
211-
// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
212-
CancellationTokenSource tokenSource = InternalTokenSource;
213-
if (TryReadInternal(tokenSource, out ReadResult readResult))
214-
{
215-
return new ValueTask<ReadResult>(readResult);
216-
}
217-
218-
if (_isStreamCompleted)
219-
{
220-
ReadResult completedResult = new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
221-
return new ValueTask<ReadResult>(completedResult);
222-
}
223-
224-
return Core(this, tokenSource, cancellationToken);
225-
226-
#if NETCOREAPP
227-
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
228-
#endif
229-
static async ValueTask<ReadResult> Core(StreamPipeReader reader, CancellationTokenSource tokenSource, CancellationToken cancellationToken)
230-
{
231-
CancellationTokenRegistration reg = default;
232-
if (cancellationToken.CanBeCanceled)
233-
{
234-
reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state!).Cancel(), reader);
235-
}
236-
237-
using (reg)
238-
{
239-
var isCanceled = false;
240-
try
241-
{
242-
// This optimization only makes sense if we don't have anything buffered
243-
if (reader.UseZeroByteReads && reader._bufferedBytes == 0)
244-
{
245-
// Wait for data by doing 0 byte read before
246-
await reader.InnerStream.ReadAsync(Memory<byte>.Empty, tokenSource.Token).ConfigureAwait(false);
247-
}
248-
249-
reader.AllocateReadTail();
250-
251-
Memory<byte> buffer = reader._readTail!.AvailableMemory.Slice(reader._readTail.End);
252-
253-
int length = await reader.InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false);
254-
255-
Debug.Assert(length + reader._readTail.End <= reader._readTail.AvailableMemory.Length);
256-
257-
reader._readTail.End += length;
258-
reader._bufferedBytes += length;
259-
260-
if (length == 0)
261-
{
262-
reader._isStreamCompleted = true;
263-
}
264-
}
265-
catch (OperationCanceledException ex)
266-
{
267-
reader.ClearCancellationToken();
268-
269-
if (cancellationToken.IsCancellationRequested)
270-
{
271-
// Simulate an OCE triggered directly by the cancellationToken rather than the InternalTokenSource
272-
throw new OperationCanceledException(ex.Message, ex, cancellationToken);
273-
}
274-
else if (tokenSource.IsCancellationRequested)
275-
{
276-
// Catch cancellation and translate it into setting isCanceled = true
277-
isCanceled = true;
278-
}
279-
else
280-
{
281-
throw;
282-
}
283-
}
284-
285-
return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, reader._isStreamCompleted);
286-
}
287-
}
203+
return ReadInternalAsync(null, cancellationToken);
288204
}
289205

290206
protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken)
207+
{
208+
return ReadInternalAsync(minimumSize, cancellationToken);
209+
}
210+
211+
private ValueTask<ReadResult> ReadInternalAsync(int? minimumSize, CancellationToken cancellationToken)
291212
{
292213
// TODO ReadyAsync needs to throw if there are overlapping reads.
293214
ThrowIfCompleted();
@@ -301,7 +222,10 @@ protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, C
301222
CancellationTokenSource tokenSource = InternalTokenSource;
302223
if (TryReadInternal(tokenSource, out ReadResult readResult))
303224
{
304-
if (readResult.Buffer.Length >= minimumSize || readResult.IsCompleted || readResult.IsCanceled)
225+
if (minimumSize is null
226+
|| readResult.Buffer.Length >= minimumSize
227+
|| readResult.IsCompleted
228+
|| readResult.IsCanceled)
305229
{
306230
return new ValueTask<ReadResult>(readResult);
307231
}
@@ -318,7 +242,7 @@ protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, C
318242
#if NETCOREAPP
319243
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
320244
#endif
321-
static async ValueTask<ReadResult> Core(StreamPipeReader reader, int minimumSize, CancellationTokenSource tokenSource, CancellationToken cancellationToken)
245+
static async ValueTask<ReadResult> Core(StreamPipeReader reader, int? minimumSize, CancellationTokenSource tokenSource, CancellationToken cancellationToken)
322246
{
323247
CancellationTokenRegistration reg = default;
324248
if (cancellationToken.CanBeCanceled)
@@ -356,13 +280,18 @@ static async ValueTask<ReadResult> Core(StreamPipeReader reader, int minimumSize
356280
reader._isStreamCompleted = true;
357281
break;
358282
}
359-
} while (reader._bufferedBytes < minimumSize);
283+
} while (minimumSize != null && reader._bufferedBytes < minimumSize);
360284
}
361-
catch (OperationCanceledException)
285+
catch (OperationCanceledException ex)
362286
{
363287
reader.ClearCancellationToken();
364288

365-
if (tokenSource.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
289+
if (cancellationToken.IsCancellationRequested)
290+
{
291+
// Simulate an OCE triggered directly by the cancellationToken rather than the InternalTokenSource
292+
throw new OperationCanceledException(ex.Message, ex, cancellationToken);
293+
}
294+
else if (tokenSource.IsCancellationRequested)
366295
{
367296
// Catch cancellation and translate it into setting isCanceled = true
368297
isCanceled = true;
@@ -371,7 +300,6 @@ static async ValueTask<ReadResult> Core(StreamPipeReader reader, int minimumSize
371300
{
372301
throw;
373302
}
374-
375303
}
376304

377305
return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, reader._isStreamCompleted);

src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,13 @@ public async Task CanReadAtLeast(int bufferSize, bool bufferedRead)
139139
}
140140

141141
[Fact]
142-
public Task ReadAtLeastAsyncThrowsIfPassedCanceledCancellationToken()
142+
public async Task ReadAtLeastAsyncThrowsIfPassedCanceledCancellationToken()
143143
{
144-
ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(0, new CancellationToken(canceled: true));
145-
return Assert.ThrowsAsync<TaskCanceledException>(async () => await task);
144+
CancellationToken token = new CancellationToken(canceled: true);
145+
ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(0, token);
146+
TaskCanceledException tce = await Assert.ThrowsAsync<TaskCanceledException>(async () => await task);
147+
Assert.Equal(token, tce.CancellationToken);
148+
Assert.Null(tce.InnerException);
146149
}
147150

148151
[Fact]
@@ -164,12 +167,13 @@ public async Task WriteAndCancellingPendingReadBeforeReadAtLeastAsync()
164167
}
165168

166169
[Fact]
167-
public Task ReadAtLeastAsyncCancelableWhenWaitingForMoreData()
170+
public async Task ReadAtLeastAsyncCancelableWhenWaitingForMoreData()
168171
{
169172
CancellationTokenSource cts = new CancellationTokenSource();
170173
ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(1, cts.Token);
171174
cts.Cancel();
172-
return Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
175+
var oce = await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
176+
Assert.Equal(cts.Token, oce.CancellationToken);
173177
}
174178

175179
[Fact]
@@ -179,7 +183,8 @@ public async Task ReadAtLeastAsyncCancelableAfterReadingSome()
179183
await Pipe.WriteAsync(new byte[10], default);
180184
ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(11, cts.Token);
181185
cts.Cancel();
182-
await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
186+
var oce = await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
187+
Assert.Equal(cts.Token, oce.CancellationToken);
183188
}
184189

185190
[Fact]
@@ -191,7 +196,8 @@ public async Task ReadAtLeastAsyncCancelableAfterReadingSomeAndWritingAfterStart
191196
// Write, but not enough to unblock ReadAtLeastAsync
192197
await Pipe.WriteAsync(new byte[1], default);
193198
cts.Cancel();
194-
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await task);
199+
var oce = await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
200+
Assert.Equal(cts.Token, oce.CancellationToken);
195201
}
196202
}
197203
}

0 commit comments

Comments
 (0)