|  | 
| 1 | 1 | #nullable enable | 
| 2 | 2 | using System; | 
| 3 | 3 | using System.Collections.Generic; | 
|  | 4 | +using System.Diagnostics; | 
| 4 | 5 | using System.Diagnostics.CodeAnalysis; | 
| 5 | 6 | using System.Globalization; | 
| 6 | 7 | using System.IO; | 
| 7 | 8 | using System.Net; | 
| 8 | 9 | using System.Runtime.CompilerServices; | 
|  | 10 | +using System.Runtime.ExceptionServices; | 
| 9 | 11 | using System.Text; | 
| 10 | 12 | using System.Threading; | 
| 11 | 13 | using System.Threading.Tasks; | 
| @@ -2456,56 +2458,82 @@ private void InternalUploadFile(Stream input, string path, Flags flags, SftpUplo | 
| 2456 | 2458 |             // create buffer of optimal length | 
| 2457 | 2459 |             var buffer = new byte[_sftpSession.CalculateOptimalWriteLength(_bufferSize, handle)]; | 
| 2458 | 2460 | 
 | 
| 2459 |  | -            var bytesRead = input.Read(buffer, 0, buffer.Length); | 
|  | 2461 | +            int bytesRead; | 
| 2460 | 2462 |             var expectedResponses = 0; | 
| 2461 |  | -            var responseReceivedWaitHandle = new AutoResetEvent(initialState: false); | 
| 2462 | 2463 | 
 | 
| 2463 |  | -            do | 
|  | 2464 | +            // We will send out all the write requests without waiting for each response. | 
|  | 2465 | +            // Afterwards, we may wait on this handle until all responses are received | 
|  | 2466 | +            // or an error has occured. | 
|  | 2467 | +            using var mres = new ManualResetEventSlim(initialState: false); | 
|  | 2468 | + | 
|  | 2469 | +            ExceptionDispatchInfo? exception = null; | 
|  | 2470 | + | 
|  | 2471 | +            while ((bytesRead = input.Read(buffer, 0, buffer.Length)) != 0) | 
| 2464 | 2472 |             { | 
| 2465 |  | -                // Cancel upload | 
| 2466 | 2473 |                 if (asyncResult is not null && asyncResult.IsUploadCanceled) | 
| 2467 | 2474 |                 { | 
| 2468 | 2475 |                     break; | 
| 2469 | 2476 |                 } | 
| 2470 | 2477 | 
 | 
| 2471 |  | -                if (bytesRead > 0) | 
|  | 2478 | +                exception?.Throw(); | 
|  | 2479 | + | 
|  | 2480 | +                var writtenBytes = offset + (ulong)bytesRead; | 
|  | 2481 | + | 
|  | 2482 | +                _ = Interlocked.Increment(ref expectedResponses); | 
|  | 2483 | +                mres.Reset(); | 
|  | 2484 | + | 
|  | 2485 | +                _sftpSession.RequestWrite(handle, offset, buffer, offset: 0, bytesRead, wait: null, s => | 
| 2472 | 2486 |                 { | 
| 2473 |  | -                    var writtenBytes = offset + (ulong)bytesRead; | 
|  | 2487 | +                    var setHandle = false; | 
|  | 2488 | + | 
|  | 2489 | +                    try | 
|  | 2490 | +                    { | 
|  | 2491 | +                        if (Sftp.SftpSession.GetSftpException(s) is Exception ex) | 
|  | 2492 | +                        { | 
|  | 2493 | +                            exception = ExceptionDispatchInfo.Capture(ex); | 
|  | 2494 | +                        } | 
| 2474 | 2495 | 
 | 
| 2475 |  | -                    _sftpSession.RequestWrite(handle, offset, buffer, offset: 0, bytesRead, wait: null, s => | 
|  | 2496 | +                        if (exception is not null) | 
| 2476 | 2497 |                         { | 
| 2477 |  | -                            if (s.StatusCode == StatusCodes.Ok) | 
| 2478 |  | -                            { | 
| 2479 |  | -                                _ = Interlocked.Decrement(ref expectedResponses); | 
| 2480 |  | -                                _ = responseReceivedWaitHandle.Set(); | 
|  | 2498 | +                            setHandle = true; | 
|  | 2499 | +                            return; | 
|  | 2500 | +                        } | 
| 2481 | 2501 | 
 | 
| 2482 |  | -                                asyncResult?.Update(writtenBytes); | 
|  | 2502 | +                        Debug.Assert(s.StatusCode == StatusCodes.Ok); | 
| 2483 | 2503 | 
 | 
| 2484 |  | -                                // Call callback to report number of bytes written | 
| 2485 |  | -                                if (uploadCallback is not null) | 
| 2486 |  | -                                { | 
| 2487 |  | -                                    // Execute callback on different thread | 
| 2488 |  | -                                    ThreadAbstraction.ExecuteThread(() => uploadCallback(writtenBytes)); | 
| 2489 |  | -                                } | 
| 2490 |  | -                            } | 
| 2491 |  | -                        }); | 
|  | 2504 | +                        asyncResult?.Update(writtenBytes); | 
|  | 2505 | + | 
|  | 2506 | +                        // Call callback to report number of bytes written | 
|  | 2507 | +                        if (uploadCallback is not null) | 
|  | 2508 | +                        { | 
|  | 2509 | +                            // Execute callback on different thread | 
|  | 2510 | +                            ThreadAbstraction.ExecuteThread(() => uploadCallback(writtenBytes)); | 
|  | 2511 | +                        } | 
|  | 2512 | +                    } | 
|  | 2513 | +                    finally | 
|  | 2514 | +                    { | 
|  | 2515 | +                        if (Interlocked.Decrement(ref expectedResponses) == 0 || setHandle) | 
|  | 2516 | +                        { | 
|  | 2517 | +                            mres.Set(); | 
|  | 2518 | +                        } | 
|  | 2519 | +                    } | 
|  | 2520 | +                }); | 
| 2492 | 2521 | 
 | 
| 2493 |  | -                    _ = Interlocked.Increment(ref expectedResponses); | 
|  | 2522 | +                offset += (ulong)bytesRead; | 
|  | 2523 | +            } | 
| 2494 | 2524 | 
 | 
| 2495 |  | -                    offset += (ulong)bytesRead; | 
|  | 2525 | +            // Make sure the read of exception cannot be executed ahead of | 
|  | 2526 | +            // the read of expectedResponses so that we do not miss an | 
|  | 2527 | +            // exception. | 
| 2496 | 2528 | 
 | 
| 2497 |  | -                    bytesRead = input.Read(buffer, 0, buffer.Length); | 
| 2498 |  | -                } | 
| 2499 |  | -                else if (expectedResponses > 0) | 
| 2500 |  | -                { | 
| 2501 |  | -                    // Wait for expectedResponses to change | 
| 2502 |  | -                    _sftpSession.WaitOnHandle(responseReceivedWaitHandle, _operationTimeout); | 
| 2503 |  | -                } | 
|  | 2529 | +            if (Volatile.Read(ref expectedResponses) != 0) | 
|  | 2530 | +            { | 
|  | 2531 | +                _sftpSession.WaitOnHandle(mres.WaitHandle, _operationTimeout); | 
| 2504 | 2532 |             } | 
| 2505 |  | -            while (expectedResponses > 0 || bytesRead > 0); | 
|  | 2533 | + | 
|  | 2534 | +            exception?.Throw(); | 
| 2506 | 2535 | 
 | 
| 2507 | 2536 |             _sftpSession.RequestClose(handle); | 
| 2508 |  | -            responseReceivedWaitHandle.Dispose(); | 
| 2509 | 2537 |         } | 
| 2510 | 2538 | 
 | 
| 2511 | 2539 |         private async Task InternalUploadFileAsync(Stream input, string path, CancellationToken cancellationToken) | 
|  | 
0 commit comments