From f61033e9b272cb77448bf223ac8715e316dd56ad Mon Sep 17 00:00:00 2001 From: J W Date: Wed, 13 May 2020 18:44:30 -0400 Subject: [PATCH 01/14] Update file to use channel --- src/Microsoft.ML.Sweeper/AsyncSweeper.cs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/Microsoft.ML.Sweeper/AsyncSweeper.cs b/src/Microsoft.ML.Sweeper/AsyncSweeper.cs index a300999ebc..247173438d 100644 --- a/src/Microsoft.ML.Sweeper/AsyncSweeper.cs +++ b/src/Microsoft.ML.Sweeper/AsyncSweeper.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using Microsoft.ML; @@ -168,7 +169,7 @@ public sealed class Options private readonly object _lock; private readonly CancellationTokenSource _cts; - private readonly BufferBlock _paramQueue; + private readonly Channel _paramQueue; private readonly int _relaxation; private readonly ISweeper _baseSweeper; private readonly IHost _host; @@ -208,7 +209,7 @@ public DeterministicSweeperAsync(IHostEnvironment env, Options options) _lock = new object(); _results = new List(); _nullRuns = new HashSet(); - _paramQueue = new BufferBlock(); + _paramQueue = Channel.CreateUnbounded(); PrepareNextBatch(null); } @@ -220,12 +221,12 @@ private void PrepareNextBatch(IEnumerable results) if (Utils.Size(paramSets) == 0) { // Mark the queue as completed. - _paramQueue.Complete(); + _paramQueue.Writer.Complete(); return; } // Assign an id to each ParameterSet and enque it. foreach (var paramSet in paramSets) - _paramQueue.Post(new ParameterSetWithId(_numGenerated++, paramSet)); + _paramQueue.Writer.TryWrite(new ParameterSetWithId(_numGenerated++, paramSet)); EnsureResultsSize(); } @@ -278,7 +279,7 @@ public async Task ProposeAsync() return null; try { - return await _paramQueue.ReceiveAsync(_cts.Token); + return await _paramQueue.Reader.ReadAsync(_cts.Token); } catch (InvalidOperationException) { From 416da19b7b8b22d5ef1135cc0d198b566a7624c6 Mon Sep 17 00:00:00 2001 From: J W Date: Wed, 13 May 2020 18:47:18 -0400 Subject: [PATCH 02/14] Add channels package --- src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj b/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj index 9c08ba85af..cd7877273b 100644 --- a/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj +++ b/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj @@ -6,6 +6,10 @@ true + + + + From 72beecb5fe6cf44baaa36456cfd507c85891eac2 Mon Sep 17 00:00:00 2001 From: J W Date: Thu, 14 May 2020 12:25:56 -0400 Subject: [PATCH 03/14] Update for feedback --- src/Microsoft.ML.Sweeper/AsyncSweeper.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Microsoft.ML.Sweeper/AsyncSweeper.cs b/src/Microsoft.ML.Sweeper/AsyncSweeper.cs index 247173438d..6e71d13729 100644 --- a/src/Microsoft.ML.Sweeper/AsyncSweeper.cs +++ b/src/Microsoft.ML.Sweeper/AsyncSweeper.cs @@ -7,7 +7,6 @@ using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; using Microsoft.ML; using Microsoft.ML.CommandLine; using Microsoft.ML.Internal.Utilities; @@ -169,7 +168,7 @@ public sealed class Options private readonly object _lock; private readonly CancellationTokenSource _cts; - private readonly Channel _paramQueue; + private readonly Channel _paramChannel; private readonly int _relaxation; private readonly ISweeper _baseSweeper; private readonly IHost _host; @@ -209,7 +208,8 @@ public DeterministicSweeperAsync(IHostEnvironment env, Options options) _lock = new object(); _results = new List(); _nullRuns = new HashSet(); - _paramQueue = Channel.CreateUnbounded(); + _paramChannel = Channel.CreateUnbounded( + new UnboundedChannelOptions { SingleWriter = true }); PrepareNextBatch(null); } @@ -221,12 +221,12 @@ private void PrepareNextBatch(IEnumerable results) if (Utils.Size(paramSets) == 0) { // Mark the queue as completed. - _paramQueue.Writer.Complete(); + _paramChannel.Writer.Complete(); return; } // Assign an id to each ParameterSet and enque it. foreach (var paramSet in paramSets) - _paramQueue.Writer.TryWrite(new ParameterSetWithId(_numGenerated++, paramSet)); + _paramChannel.Writer.TryWrite(new ParameterSetWithId(_numGenerated++, paramSet)); EnsureResultsSize(); } @@ -279,7 +279,7 @@ public async Task ProposeAsync() return null; try { - return await _paramQueue.Reader.ReadAsync(_cts.Token); + return await _paramChannel.Reader.ReadAsync(_cts.Token); } catch (InvalidOperationException) { From e688d05a23e0db7fa2b3b264aaf4ec8615d02c86 Mon Sep 17 00:00:00 2001 From: J W Date: Thu, 14 May 2020 13:48:24 -0400 Subject: [PATCH 04/14] Update more buffer blocks to use channel --- .../Microsoft.ML.Data.csproj | 1 + .../Transforms/RowShufflingTransformer.cs | 40 +++++++++---------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj b/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj index f3e7d96b59..7bc7652579 100644 --- a/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj +++ b/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj @@ -10,6 +10,7 @@ + diff --git a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs index 090a34049e..0ec39934a4 100644 --- a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs +++ b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs @@ -5,8 +5,8 @@ using System; using System.Collections.Generic; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; using Microsoft.ML; using Microsoft.ML.CommandLine; using Microsoft.ML.Data; @@ -486,8 +486,8 @@ private static readonly FuncInstanceMethodInfo1 _createGe private int _liveCount; private bool _doneConsuming; - private readonly BufferBlock _toProduce; - private readonly BufferBlock _toConsume; + private readonly Channel _toProduceChannel; + private readonly Channel _toConsumeChannel; private readonly Task _producerTask; private Exception _producerTaskException; @@ -541,13 +541,13 @@ public Cursor(IChannelProvider provider, int poolRows, DataViewRowCursor input, _liveCount = 1; // Set up the producer worker. - _toConsume = new BufferBlock(); - _toProduce = new BufferBlock(); + _toConsumeChannel = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleWriter = true }); + _toProduceChannel = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleWriter = true }); // First request the pool - 1 + block size rows, to get us going. - PostAssert(_toProduce, _poolRows - 1 + _blockSize); + PostAssert(_toProduceChannel, _poolRows - 1 + _blockSize); // Queue up the remaining capacity. for (int i = 1; i < _bufferDepth; ++i) - PostAssert(_toProduce, _blockSize); + PostAssert(_toProduceChannel, _blockSize); _producerTask = ProduceAsync(); } @@ -559,28 +559,28 @@ protected override void Dispose(bool disposing) if (disposing) { - _toProduce.Complete(); + _toProduceChannel.Writer.Complete(); _producerTask.Wait(); // Complete the consumer after the producerTask has finished, since producerTask could // have posted more items to _toConsume. - _toConsume.Complete(); + _toConsumeChannel.Writer.Complete(); // Drain both BufferBlocks - this prevents what appears to be memory leaks when using the VS Debugger // because if a BufferBlock still contains items, its underlying Tasks are not getting completed. // See https://github.com/dotnet/corefx/issues/30582 for the VS Debugger issue. // See also https://github.com/dotnet/machinelearning/issues/4399 - _toProduce.TryReceiveAll(out _); - _toConsume.TryReceiveAll(out _); + _toProduceChannel.Reader.ReadAsync(); + _toConsumeChannel.Reader.ReadAsync(); } _disposed = true; base.Dispose(disposing); } - public static void PostAssert(ITargetBlock target, T item) + public static void PostAssert(Channel target, T item) { - bool retval = target.Post(item); + bool retval = target.Writer.TryWrite(item); Contracts.Assert(retval); } @@ -594,10 +594,10 @@ private async Task ProduceAsync() try { int circularIndex = 0; - while (await _toProduce.OutputAvailableAsync().ConfigureAwait(false)) + while (await _toProduceChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) { int requested; - if (!_toProduce.TryReceive(out requested)) + if (!_toProduceChannel.Reader.TryRead(out requested)) { // OutputAvailableAsync returned true, but TryReceive returned false - // so loop back around and try again. @@ -618,14 +618,14 @@ private async Task ProduceAsync() if (circularIndex == _pipeIndices.Length) circularIndex = 0; } - PostAssert(_toConsume, numRows); + PostAssert(_toConsumeChannel, numRows); if (numRows < requested) { // We've reached the end of the cursor. Send the sentinel, then exit. // This assumes that the receiver will receive things in Post order // (so that the sentinel is received, after the last Post). if (numRows > 0) - PostAssert(_toConsume, 0); + PostAssert(_toConsumeChannel, 0); return; } } @@ -634,7 +634,7 @@ private async Task ProduceAsync() { _producerTaskException = ex; // Send the sentinel in this case as well, the field will be checked. - PostAssert(_toConsume, 0); + PostAssert(_toConsumeChannel, 0); } } @@ -652,14 +652,14 @@ protected override bool MoveNextCore() // We should let the producer know it can give us more stuff. // It is possible for int values to be sent beyond the // end of the sentinel, but we suppose this is irrelevant. - PostAssert(_toProduce, _deadCount); + PostAssert(_toProduceChannel, _deadCount); _deadCount = 0; } while (_liveCount < _poolRows && !_doneConsuming) { // We are under capacity. Try to get some more. - int got = _toConsume.Receive(); + _toConsumeChannel.Reader.TryRead(out int got); if (got == 0) { // We've reached the end sentinel. There's no reason From 77ec8346fbd139b9599cbb4f95912781c3fac7a9 Mon Sep 17 00:00:00 2001 From: J W Date: Fri, 15 May 2020 05:36:34 -0400 Subject: [PATCH 05/14] Add version to props file --- build/Dependencies.props | 1 + src/Microsoft.ML.Data/Microsoft.ML.Data.csproj | 2 +- src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/build/Dependencies.props b/build/Dependencies.props index 9ede98e350..760cb938fb 100644 --- a/build/Dependencies.props +++ b/build/Dependencies.props @@ -9,6 +9,7 @@ 4.5.1 4.3.0 4.8.0 + 4.7.1 diff --git a/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj b/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj index 7bc7652579..d43b3392c3 100644 --- a/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj +++ b/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj b/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj index cd7877273b..eb5cd2bb51 100644 --- a/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj +++ b/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj @@ -7,7 +7,7 @@ - + From 32f3a01da970971754fcdcbe0644c5cf905f8b47 Mon Sep 17 00:00:00 2001 From: J W Date: Wed, 20 May 2020 16:28:18 -0400 Subject: [PATCH 06/14] Remove build dependencies that aren't needed --- build/Dependencies.props | 1 - src/Microsoft.ML.Data/Microsoft.ML.Data.csproj | 1 - src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj | 6 +----- 3 files changed, 1 insertion(+), 7 deletions(-) diff --git a/build/Dependencies.props b/build/Dependencies.props index 760cb938fb..4afccaa748 100644 --- a/build/Dependencies.props +++ b/build/Dependencies.props @@ -8,7 +8,6 @@ 1.5.0 4.5.1 4.3.0 - 4.8.0 4.7.1 diff --git a/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj b/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj index d43b3392c3..8076f82ef9 100644 --- a/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj +++ b/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj @@ -11,7 +11,6 @@ - diff --git a/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj b/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj index eb5cd2bb51..33be88ab2e 100644 --- a/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj +++ b/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj @@ -5,11 +5,7 @@ CORECLR true - - - - - + From b6b39c8975707b1cc859d3ec232870cbc22fd44f Mon Sep 17 00:00:00 2001 From: J W Date: Thu, 21 May 2020 04:31:17 -0400 Subject: [PATCH 07/14] Update comments --- .../Transforms/RowShufflingTransformer.cs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs index 0ec39934a4..599dc50f1d 100644 --- a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs +++ b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs @@ -562,16 +562,16 @@ protected override void Dispose(bool disposing) _toProduceChannel.Writer.Complete(); _producerTask.Wait(); - // Complete the consumer after the producerTask has finished, since producerTask could - // have posted more items to _toConsume. + // Complete the channel after the producerTask has finished, since producerTask could + // have posted more items to _toConsumeChannel. _toConsumeChannel.Writer.Complete(); // Drain both BufferBlocks - this prevents what appears to be memory leaks when using the VS Debugger // because if a BufferBlock still contains items, its underlying Tasks are not getting completed. // See https://github.com/dotnet/corefx/issues/30582 for the VS Debugger issue. // See also https://github.com/dotnet/machinelearning/issues/4399 - _toProduceChannel.Reader.ReadAsync(); - _toConsumeChannel.Reader.ReadAsync(); + //_toProduceChannel.Reader.ReadAsync(); + //_toConsumeChannel.Reader.ReadAsync(); } _disposed = true; @@ -599,7 +599,8 @@ private async Task ProduceAsync() int requested; if (!_toProduceChannel.Reader.TryRead(out requested)) { - // OutputAvailableAsync returned true, but TryReceive returned false - + // The producer Channel's Reader.WaitToReadAsync returned true, + // but the Reader's TryRead returned false - // so loop back around and try again. continue; } @@ -651,7 +652,7 @@ protected override bool MoveNextCore() { // We should let the producer know it can give us more stuff. // It is possible for int values to be sent beyond the - // end of the sentinel, but we suppose this is irrelevant. + // end of the Channel, but we suppose this is irrelevant. PostAssert(_toProduceChannel, _deadCount); _deadCount = 0; } @@ -662,7 +663,7 @@ protected override bool MoveNextCore() _toConsumeChannel.Reader.TryRead(out int got); if (got == 0) { - // We've reached the end sentinel. There's no reason + // We've reached the end of the Channel. There's no reason // to attempt further communication with the producer. // Check whether something horrible happened. if (_producerTaskException != null) From 8fc6a568a5e99c9159786de271071cad0e98115e Mon Sep 17 00:00:00 2001 From: J W Date: Thu, 21 May 2020 08:16:07 -0400 Subject: [PATCH 08/14] Add back data flow package --- build/Dependencies.props | 1 + src/Microsoft.ML.Data/Microsoft.ML.Data.csproj | 1 + 2 files changed, 2 insertions(+) diff --git a/build/Dependencies.props b/build/Dependencies.props index 4afccaa748..760cb938fb 100644 --- a/build/Dependencies.props +++ b/build/Dependencies.props @@ -8,6 +8,7 @@ 1.5.0 4.5.1 4.3.0 + 4.8.0 4.7.1 diff --git a/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj b/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj index 8076f82ef9..d43b3392c3 100644 --- a/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj +++ b/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj @@ -11,6 +11,7 @@ + From 335b7249b769827f079c35ca1f02afecabb8f654 Mon Sep 17 00:00:00 2001 From: J W Date: Thu, 21 May 2020 12:08:36 -0400 Subject: [PATCH 09/14] Remove data flow package everywhere --- build/Dependencies.props | 1 - pkg/Microsoft.ML/Microsoft.ML.nupkgproj | 1 - src/Microsoft.ML.Data/Microsoft.ML.Data.csproj | 1 - 3 files changed, 3 deletions(-) diff --git a/build/Dependencies.props b/build/Dependencies.props index 760cb938fb..4afccaa748 100644 --- a/build/Dependencies.props +++ b/build/Dependencies.props @@ -8,7 +8,6 @@ 1.5.0 4.5.1 4.3.0 - 4.8.0 4.7.1 diff --git a/pkg/Microsoft.ML/Microsoft.ML.nupkgproj b/pkg/Microsoft.ML/Microsoft.ML.nupkgproj index e8f7feace3..aae0ea22ae 100644 --- a/pkg/Microsoft.ML/Microsoft.ML.nupkgproj +++ b/pkg/Microsoft.ML/Microsoft.ML.nupkgproj @@ -11,7 +11,6 @@ - diff --git a/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj b/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj index d43b3392c3..8076f82ef9 100644 --- a/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj +++ b/src/Microsoft.ML.Data/Microsoft.ML.Data.csproj @@ -11,7 +11,6 @@ - From 8a45c1d8e1a707a81e6acd9b83c6c5c5c2f5fa26 Mon Sep 17 00:00:00 2001 From: J W Date: Thu, 21 May 2020 12:13:25 -0400 Subject: [PATCH 10/14] Update from PR feedback --- .../Transforms/RowShufflingTransformer.cs | 40 ++++++------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs index 599dc50f1d..5128d70ae7 100644 --- a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs +++ b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs @@ -557,23 +557,6 @@ protected override void Dispose(bool disposing) if (_disposed) return; - if (disposing) - { - _toProduceChannel.Writer.Complete(); - _producerTask.Wait(); - - // Complete the channel after the producerTask has finished, since producerTask could - // have posted more items to _toConsumeChannel. - _toConsumeChannel.Writer.Complete(); - - // Drain both BufferBlocks - this prevents what appears to be memory leaks when using the VS Debugger - // because if a BufferBlock still contains items, its underlying Tasks are not getting completed. - // See https://github.com/dotnet/corefx/issues/30582 for the VS Debugger issue. - // See also https://github.com/dotnet/machinelearning/issues/4399 - //_toProduceChannel.Reader.ReadAsync(); - //_toConsumeChannel.Reader.ReadAsync(); - } - _disposed = true; base.Dispose(disposing); } @@ -660,18 +643,21 @@ protected override bool MoveNextCore() while (_liveCount < _poolRows && !_doneConsuming) { // We are under capacity. Try to get some more. - _toConsumeChannel.Reader.TryRead(out int got); - if (got == 0) + var hasReadItem = _toConsumeChannel.Reader.TryRead(out int got); + if (hasReadItem) { - // We've reached the end of the Channel. There's no reason - // to attempt further communication with the producer. - // Check whether something horrible happened. - if (_producerTaskException != null) - throw Ch.Except(_producerTaskException, "Shuffle input cursor reader failed with an exception"); - _doneConsuming = true; - break; + if (got == 0) + { + // We've reached the end of the Channel. There's no reason + // to attempt further communication with the producer. + // Check whether something horrible happened. + if (_producerTaskException != null) + throw Ch.Except(_producerTaskException, "Shuffle input cursor reader failed with an exception"); + _doneConsuming = true; + break; + } + _liveCount += got; } - _liveCount += got; } if (_liveCount == 0) return false; From 9b3a5e6e8c7799a6b2661e0173d7e8886d68e4f0 Mon Sep 17 00:00:00 2001 From: J W Date: Thu, 11 Jun 2020 05:12:16 -0400 Subject: [PATCH 11/14] Revert carriage return --- src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj b/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj index 33be88ab2e..9c08ba85af 100644 --- a/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj +++ b/src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj @@ -5,7 +5,7 @@ CORECLR true - + From 325f8dd701b46ca8ebb50bc8f15c80bd586383a6 Mon Sep 17 00:00:00 2001 From: J W Date: Thu, 11 Jun 2020 05:14:18 -0400 Subject: [PATCH 12/14] Updates from comments --- pkg/Microsoft.ML/Microsoft.ML.nupkgproj | 1 + .../Transforms/RowShufflingTransformer.cs | 9 --------- test/Microsoft.ML.FSharp.Tests/SmokeTests.fs | 1 - 3 files changed, 1 insertion(+), 10 deletions(-) diff --git a/pkg/Microsoft.ML/Microsoft.ML.nupkgproj b/pkg/Microsoft.ML/Microsoft.ML.nupkgproj index aae0ea22ae..43ad73f248 100644 --- a/pkg/Microsoft.ML/Microsoft.ML.nupkgproj +++ b/pkg/Microsoft.ML/Microsoft.ML.nupkgproj @@ -14,6 +14,7 @@ + diff --git a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs index 5128d70ae7..1ae759a851 100644 --- a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs +++ b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs @@ -552,15 +552,6 @@ public Cursor(IChannelProvider provider, int poolRows, DataViewRowCursor input, _producerTask = ProduceAsync(); } - protected override void Dispose(bool disposing) - { - if (_disposed) - return; - - _disposed = true; - base.Dispose(disposing); - } - public static void PostAssert(Channel target, T item) { bool retval = target.Writer.TryWrite(item); diff --git a/test/Microsoft.ML.FSharp.Tests/SmokeTests.fs b/test/Microsoft.ML.FSharp.Tests/SmokeTests.fs index 292e16f30f..2c6b18811d 100644 --- a/test/Microsoft.ML.FSharp.Tests/SmokeTests.fs +++ b/test/Microsoft.ML.FSharp.Tests/SmokeTests.fs @@ -22,7 +22,6 @@ #r @"../../bin/AnyCPU.Debug/Microsoft.ML.FSharp.Tests/net461/Google.Protobuf.dll" #r @"../../bin/AnyCPU.Debug/Microsoft.ML.FSharp.Tests/net461/Newtonsoft.Json.dll" #r @"../../bin/AnyCPU.Debug/Microsoft.ML.FSharp.Tests/net461/System.CodeDom.dll" -#r @"../../bin/AnyCPU.Debug/Microsoft.ML.FSharp.Tests/net461/System.Threading.Tasks.Dataflow.dll" #r @"../../bin/AnyCPU.Debug/Microsoft.ML.FSharp.Tests/net461/Microsoft.ML.CpuMath.dll" #r @"../../bin/AnyCPU.Debug/Microsoft.ML.FSharp.Tests/net461/Microsoft.ML.Data.dll" #r @"../../bin/AnyCPU.Debug/Microsoft.ML.FSharp.Tests/net461/Microsoft.ML.Transforms.dll" From b853e84ba3b6e11083bf8423676347aabdabdc32 Mon Sep 17 00:00:00 2001 From: J W Date: Thu, 11 Jun 2020 05:29:55 -0400 Subject: [PATCH 13/14] Remove disposed variable --- src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs index 1ae759a851..72c499983f 100644 --- a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs +++ b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs @@ -492,7 +492,6 @@ private static readonly FuncInstanceMethodInfo1 _createGe private Exception _producerTaskException; private readonly int[] _colToActivesIndex; - private bool _disposed; public override DataViewSchema Schema => _input.Schema; From 158ab0790677161fd3a1a71c78329baa573ba9b7 Mon Sep 17 00:00:00 2001 From: J W Date: Wed, 8 Jul 2020 06:33:50 -0400 Subject: [PATCH 14/14] Block receiving thread --- .../Transforms/RowShufflingTransformer.cs | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs index 72c499983f..87e1ecdec8 100644 --- a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs +++ b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs @@ -633,20 +633,23 @@ protected override bool MoveNextCore() while (_liveCount < _poolRows && !_doneConsuming) { // We are under capacity. Try to get some more. - var hasReadItem = _toConsumeChannel.Reader.TryRead(out int got); - if (hasReadItem) + while (_toConsumeChannel.Reader.WaitToReadAsync().GetAwaiter().GetResult()) { - if (got == 0) + var hasReadItem = _toConsumeChannel.Reader.TryRead(out int got); + if (hasReadItem) { - // We've reached the end of the Channel. There's no reason - // to attempt further communication with the producer. - // Check whether something horrible happened. - if (_producerTaskException != null) - throw Ch.Except(_producerTaskException, "Shuffle input cursor reader failed with an exception"); - _doneConsuming = true; - break; + if (got == 0) + { + // We've reached the end of the Channel. There's no reason + // to attempt further communication with the producer. + // Check whether something horrible happened. + if (_producerTaskException != null) + throw Ch.Except(_producerTaskException, "Shuffle input cursor reader failed with an exception"); + _doneConsuming = true; + break; + } + _liveCount += got; } - _liveCount += got; } } if (_liveCount == 0)