diff --git a/src/Renci.SshNet/Common/LinkedListQueue.cs b/src/Renci.SshNet/Common/LinkedListQueue.cs new file mode 100644 index 000000000..57dd99d4b --- /dev/null +++ b/src/Renci.SshNet/Common/LinkedListQueue.cs @@ -0,0 +1,141 @@ +namespace Renci.SshNet.Common +{ + using System; + using System.Threading; + + /// + /// Fast concurrent generic linked list queue. + /// + internal class LinkedListQueue : IDisposable + { + sealed class Entry + { + public E Item; + public Entry Next; + } + + private readonly object _lock = new object(); + + private Entry _first; + private Entry _last; + + private bool _isAddingCompleted; + + /// + /// Gets whether this has been marked as complete for adding and is empty. + /// + /// Whether this queue has been marked as complete for adding and is empty. + public bool IsCompleted + { + get { return _isAddingCompleted && _first == null && _last == null; } + } + + /// + /// Gets whether this has been marked as complete for adding. + /// + /// Whether this queue has been marked as complete for adding. + public bool IsAddingCompleted + { + get { return _isAddingCompleted; } + set + { + lock (_lock) + { + _isAddingCompleted = value; + } + } + } + + /// + /// Adds the item to . + /// + /// The item to be added to the queue. The value can be a null reference. + public void Add(T item) + { + lock (_lock) + { + if (_isAddingCompleted) + return; + + var entry = new Entry(); + entry.Item = item; + + if (_last != null) + { + _last.Next = entry; + } + + _last = entry; + + if (_first == null) + { + _first = entry; + } + + Monitor.PulseAll(_lock); + } + } + + /// + /// Marks the instances as not accepting any more additions. + /// + public void CompleteAdding() + { + lock (_lock) + { + IsAddingCompleted = true; + Monitor.PulseAll(_lock); + } + } + + /// + /// Tries to remove an item from the . + /// + /// true, if an item could be removed; otherwise false. + /// The item to be removed from the queue. + /// Wait for data or fail immediately if empty. + public bool TryTake(out T item, bool wait) + { + lock (_lock) + { + if (_first == null && !wait) + { + item = default(T); + return false; + } + + while (_first == null && !_isAddingCompleted) + Monitor.Wait(_lock); + + if (_first == null && _isAddingCompleted) + { + item = default(T); + return false; + } + + item = _first.Item; + _first = _first.Next; + return true; + } + } + + /// + /// Releases all resource used by the object. + /// + /// Call when you are finished using the + /// . The method leaves the + /// in an unusable state. After calling + /// , you must release all references to the + /// so the garbage collector can reclaim the memory that + /// the was occupying. + public void Dispose() + { + lock (_lock) + { + _first = null; + _last = null; + _isAddingCompleted = true; + } + } + } +} diff --git a/src/Renci.SshNet/Common/Pipe.cs b/src/Renci.SshNet/Common/Pipe.cs new file mode 100644 index 000000000..2ef08d49b --- /dev/null +++ b/src/Renci.SshNet/Common/Pipe.cs @@ -0,0 +1,47 @@ +namespace Renci.SshNet.Common +{ + using System; + using System.IO; + + /// + /// A generic pipe to pass through data. + /// + internal class Pipe : IDisposable + { + private readonly LinkedListQueue _queue; + + /// + /// Gets the input stream. + /// + /// The input stream. + public Stream InputStream { get; private set; } + + /// + /// Gets the output stream. + /// + /// The output stream. + public Stream OutputStream { get; private set; } + + public Pipe() + { + _queue = new LinkedListQueue(); + InputStream = new PipeInputStream(_queue); + OutputStream = new PipeOutputStream(_queue); + } + + /// + /// Releases all resource used by the object. + /// + /// Call when you are finished using the . The + /// method leaves the in an unusable state. After + /// calling , you must release all references to the + /// so the garbage collector can reclaim the memory that the + /// was occupying. + public void Dispose() + { + OutputStream.Dispose(); + InputStream.Dispose(); + _queue.Dispose(); + } + } +} diff --git a/src/Renci.SshNet/Common/PipeInputStream.cs b/src/Renci.SshNet/Common/PipeInputStream.cs new file mode 100644 index 000000000..d12fe9aa6 --- /dev/null +++ b/src/Renci.SshNet/Common/PipeInputStream.cs @@ -0,0 +1,122 @@ +namespace Renci.SshNet.Common +{ + using System; + using System.IO; + + internal class PipeInputStream : Stream + { + private LinkedListQueue _queue; + private byte[] _current; + private int _currentPosition; + private bool _isDisposed; + + public PipeInputStream(LinkedListQueue queue) + { + _queue = queue; + } + + public override void Flush() + { + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + if (buffer == null) + throw new ArgumentNullException("buffer"); + if (offset + count > buffer.Length) + throw new ArgumentException("The sum of offset and count is greater than the buffer length."); + if (offset < 0 || count < 0) + throw new ArgumentOutOfRangeException("offset", "offset or count is negative."); + if (_isDisposed) + throw CreateObjectDisposedException(); + + var bytesRead = 0; + + while (bytesRead < count) + { + if (_current == null || _currentPosition == _current.Length) + { + if (!_queue.TryTake(out _current, (bytesRead == 0))) + { + _current = null; + return bytesRead; + } + + _currentPosition = 0; + } + + var toRead = _current.Length - _currentPosition; + if (toRead > count - bytesRead) + toRead = count - bytesRead; + + Buffer.BlockCopy(_current, _currentPosition, buffer, offset + bytesRead, toRead); + + _currentPosition += toRead; + bytesRead += toRead; + } + + return bytesRead; + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + + public override bool CanRead + { + get { return !_isDisposed; } + } + + public override bool CanSeek + { + get { return false; } + } + + public override bool CanWrite + { + get { return false; } + } + + public override long Length + { + get + { + throw new NotSupportedException(); + } + } + + public override long Position + { + get + { + throw new NotSupportedException(); + } + set + { + throw new NotSupportedException(); + } + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + _isDisposed = true; + } + + private ObjectDisposedException CreateObjectDisposedException() + { + return new ObjectDisposedException(GetType().FullName); + } + } +} diff --git a/src/Renci.SshNet/Common/PipeOutputStream.cs b/src/Renci.SshNet/Common/PipeOutputStream.cs new file mode 100644 index 000000000..42905608f --- /dev/null +++ b/src/Renci.SshNet/Common/PipeOutputStream.cs @@ -0,0 +1,109 @@ +namespace Renci.SshNet.Common +{ + using System; + using System.IO; + + internal class PipeOutputStream : Stream + { + private LinkedListQueue _queue; + private bool _isDisposed; + + public PipeOutputStream(LinkedListQueue queue) + { + _queue = queue; + } + + public override void Flush() + { + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + if (buffer == null) + throw new ArgumentNullException("buffer"); + if (offset + count > buffer.Length) + throw new ArgumentException("The sum of offset and count is greater than the buffer length."); + if (offset < 0 || count < 0) + throw new ArgumentOutOfRangeException("offset", "offset or count is negative."); + if (_isDisposed || _queue.IsAddingCompleted) + throw CreateObjectDisposedException(); + + byte[] tmp = new byte[count]; + Buffer.BlockCopy(buffer, offset, tmp, 0, count); + _queue.Add(tmp); + } + + public override bool CanRead + { + get { return false; } + } + + public override bool CanSeek + { + get { return false; } + } + + public override bool CanWrite + { + get { return !_isDisposed; } + } + + public override long Length + { + get + { + throw new NotSupportedException(); + } + } + + public override long Position + { + get + { + throw new NotSupportedException(); + } + set + { + throw new NotSupportedException(); + } + } + + public override void Close() + { + if (!_queue.IsAddingCompleted) + _queue.CompleteAdding(); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (!_isDisposed) + { + if (!_queue.IsAddingCompleted) + _queue.CompleteAdding(); + _isDisposed = true; + } + } + + private ObjectDisposedException CreateObjectDisposedException() + { + return new ObjectDisposedException(GetType().FullName); + } + } +} diff --git a/src/Renci.SshNet/Renci.SshNet.csproj b/src/Renci.SshNet/Renci.SshNet.csproj index fd278e17b..e5fe85017 100644 --- a/src/Renci.SshNet/Renci.SshNet.csproj +++ b/src/Renci.SshNet/Renci.SshNet.csproj @@ -441,6 +441,10 @@ + + + + diff --git a/src/Renci.SshNet/SshCommand.cs b/src/Renci.SshNet/SshCommand.cs index 37e91da08..4a2a9bb27 100644 --- a/src/Renci.SshNet/SshCommand.cs +++ b/src/Renci.SshNet/SshCommand.cs @@ -25,6 +25,8 @@ public class SshCommand : IDisposable private Exception _exception; private bool _hasError; private readonly object _endExecuteLock = new object(); + private Pipe _stdoutPipe; + private Pipe _stderrPipe; /// /// Gets the command text. @@ -56,7 +58,10 @@ public class SshCommand : IDisposable /// /// /// - public Stream OutputStream { get; private set; } + public Stream OutputStream + { + get { return _stdoutPipe.InputStream; } + } /// /// Gets the extended output stream. @@ -64,7 +69,10 @@ public class SshCommand : IDisposable /// /// /// - public Stream ExtendedOutputStream { get; private set; } + public Stream ExtendedOutputStream + { + get { return _stderrPipe.InputStream; } + } private StringBuilder _result; /// @@ -82,10 +90,10 @@ public string Result _result = new StringBuilder(); } - if (OutputStream != null && OutputStream.Length > 0) + if (_stdoutPipe != null) { // do not dispose the StreamReader, as it would also dispose the stream - var sr = new StreamReader(OutputStream, _encoding); + var sr = new StreamReader(_stdoutPipe.InputStream, _encoding); _result.Append(sr.ReadToEnd()); } @@ -111,10 +119,10 @@ public string Error _error = new StringBuilder(); } - if (ExtendedOutputStream != null && ExtendedOutputStream.Length > 0) + if (_stderrPipe != null) { // do not dispose the StreamReader, as it would also dispose the stream - var sr = new StreamReader(ExtendedOutputStream, _encoding); + var sr = new StreamReader(_stderrPipe.InputStream, _encoding); _error.Append(sr.ReadToEnd()); } @@ -230,23 +238,19 @@ public IAsyncResult BeginExecute(AsyncCallback callback, object state) if (string.IsNullOrEmpty(CommandText)) throw new ArgumentException("CommandText property is empty."); - var outputStream = OutputStream; - if (outputStream != null) + if (_stdoutPipe != null) { - outputStream.Dispose(); - OutputStream = null; + _stdoutPipe.Dispose(); } - var extendedOutputStream = ExtendedOutputStream; - if (extendedOutputStream != null) + if (_stderrPipe != null) { - extendedOutputStream.Dispose(); - ExtendedOutputStream = null; + _stderrPipe.Dispose(); } - // Initialize output streams - OutputStream = new PipeStream(); - ExtendedOutputStream = new PipeStream(); + // Initialize pipes + _stdoutPipe = new Pipe(); + _stderrPipe = new Pipe(); _result = null; _error = null; @@ -395,16 +399,14 @@ private void Session_ErrorOccured(object sender, ExceptionEventArgs e) private void Channel_Closed(object sender, ChannelEventArgs e) { - var outputStream = OutputStream; - if (outputStream != null) + if (_stdoutPipe != null) { - outputStream.Flush(); + _stdoutPipe.OutputStream.Close(); } - var extendedOutputStream = ExtendedOutputStream; - if (extendedOutputStream != null) + if (_stderrPipe != null) { - extendedOutputStream.Flush(); + _stderrPipe.OutputStream.Close(); } _asyncResult.IsCompleted = true; @@ -442,10 +444,9 @@ private void Channel_RequestReceived(object sender, ChannelRequestEventArgs e) private void Channel_ExtendedDataReceived(object sender, ChannelExtendedDataEventArgs e) { - if (ExtendedOutputStream != null) + if (_stderrPipe != null) { - ExtendedOutputStream.Write(e.Data, 0, e.Data.Length); - ExtendedOutputStream.Flush(); + _stderrPipe.OutputStream.Write(e.Data, 0, e.Data.Length); } if (e.DataTypeCode == 1) @@ -456,10 +457,9 @@ private void Channel_ExtendedDataReceived(object sender, ChannelExtendedDataEven private void Channel_DataReceived(object sender, ChannelDataEventArgs e) { - if (OutputStream != null) + if (_stdoutPipe != null) { - OutputStream.Write(e.Data, 0, e.Data.Length); - OutputStream.Flush(); + _stdoutPipe.OutputStream.Write(e.Data, 0, e.Data.Length); } if (_asyncResult != null) @@ -557,18 +557,16 @@ protected virtual void Dispose(bool disposing) _channel = null; } - var outputStream = OutputStream; - if (outputStream != null) + if (_stdoutPipe != null) { - outputStream.Dispose(); - OutputStream = null; + _stdoutPipe.Dispose(); + _stdoutPipe = null; } - var extendedOutputStream = ExtendedOutputStream; - if (extendedOutputStream != null) + if (_stderrPipe != null) { - extendedOutputStream.Dispose(); - ExtendedOutputStream = null; + _stderrPipe.Dispose(); + _stderrPipe = null; } var sessionErrorOccuredWaitHandle = _sessionErrorOccuredWaitHandle;