From 49bb9682d073264f76da55ed2786faff1387a268 Mon Sep 17 00:00:00 2001 From: Mandy Shieh Date: Thu, 3 May 2018 17:49:18 -0700 Subject: [PATCH 1/5] block size exception --- src/Microsoft.ML.Parquet/ParquetLoader.cs | 40 ++++++++++++++++++----- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/src/Microsoft.ML.Parquet/ParquetLoader.cs b/src/Microsoft.ML.Parquet/ParquetLoader.cs index f0ecde34dc..418017ee10 100644 --- a/src/Microsoft.ML.Parquet/ParquetLoader.cs +++ b/src/Microsoft.ML.Parquet/ParquetLoader.cs @@ -390,9 +390,21 @@ public Cursor(ParquetLoader parent, Func predicate, IRandom rand) Columns = _loader._columnsLoaded.Select(i => i.Name).ToArray() }; - int numBlocks = (int)Math.Ceiling(((decimal)parent.GetRowCount() / _readerOptions.Count)); - int[] blockOrder = _rand == null ? Utils.GetIdentityPermutation(numBlocks) : Utils.GetRandomPermutation(rand, numBlocks); - _blockEnumerator = blockOrder.GetEnumerator(); + try + { + int numBlocks = checked((int)Math.Ceiling(((decimal)parent.GetRowCount() / _readerOptions.Count))); + int[] blockOrder = _rand == null ? Utils.GetIdentityPermutation(numBlocks) : Utils.GetRandomPermutation(rand, numBlocks); + _blockEnumerator = blockOrder.GetEnumerator(); + } + catch (Exception e) + { + if (e is OutOfMemoryException || e is OverflowException) + { + throw new InvalidDataException("Error due to too many blocks. Try increasing block size.", e); + } + + throw; + } _dataSetEnumerator = new int[0].GetEnumerator(); // Initialize an empty enumerator to get started _columnValues = new IList[_actives.Length]; @@ -477,7 +489,7 @@ protected override bool MoveNextCore() } else if (_blockEnumerator.MoveNext()) { - _readerOptions.Offset = (int)_blockEnumerator.Current * _readerOptions.Count; + _readerOptions.Offset = (long)_blockEnumerator.Current * _readerOptions.Count; // When current dataset runs out, read the next portion of the parquet file. DataSet ds; @@ -486,9 +498,21 @@ protected override bool MoveNextCore() ds = ParquetReader.Read(_loader._parquetStream, _loader._parquetOptions, _readerOptions); } - int[] dataSetOrder = _rand == null ? Utils.GetIdentityPermutation(ds.RowCount) : Utils.GetRandomPermutation(_rand, ds.RowCount); - _dataSetEnumerator = dataSetOrder.GetEnumerator(); - _curDataSetRow = dataSetOrder[0]; + try + { + int[] dataSetOrder = _rand == null ? Utils.GetIdentityPermutation(ds.RowCount) : Utils.GetRandomPermutation(_rand, ds.RowCount); + _dataSetEnumerator = dataSetOrder.GetEnumerator(); + _curDataSetRow = dataSetOrder[0]; + } + catch (Exception e) + { + if (e is OutOfMemoryException) + { + throw new InvalidDataException("Error caused because block size too big. Try decreasing block size.", e); + } + + throw; + } // Cache list for each active column for (int i = 0; i < _actives.Length; i++) @@ -671,4 +695,4 @@ private string ConvertListToString(IList list) } } } -} +} \ No newline at end of file From 64de2d6aaf75379ef33b6cc429c6d6f024618a23 Mon Sep 17 00:00:00 2001 From: Mandy Shieh Date: Tue, 8 May 2018 12:04:18 -0700 Subject: [PATCH 2/5] catch exception when style --- src/Microsoft.ML.Parquet/ParquetLoader.cs | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/src/Microsoft.ML.Parquet/ParquetLoader.cs b/src/Microsoft.ML.Parquet/ParquetLoader.cs index 418017ee10..fa6465f998 100644 --- a/src/Microsoft.ML.Parquet/ParquetLoader.cs +++ b/src/Microsoft.ML.Parquet/ParquetLoader.cs @@ -396,14 +396,9 @@ public Cursor(ParquetLoader parent, Func predicate, IRandom rand) int[] blockOrder = _rand == null ? Utils.GetIdentityPermutation(numBlocks) : Utils.GetRandomPermutation(rand, numBlocks); _blockEnumerator = blockOrder.GetEnumerator(); } - catch (Exception e) + catch (Exception e) when (e is OutOfMemoryException || e is OverflowException) { - if (e is OutOfMemoryException || e is OverflowException) - { - throw new InvalidDataException("Error due to too many blocks. Try increasing block size.", e); - } - - throw; + parent._host.Except(e, "Error due to too many blocks. Try increasing block size."); } _dataSetEnumerator = new int[0].GetEnumerator(); // Initialize an empty enumerator to get started @@ -504,14 +499,9 @@ protected override bool MoveNextCore() _dataSetEnumerator = dataSetOrder.GetEnumerator(); _curDataSetRow = dataSetOrder[0]; } - catch (Exception e) + catch (Exception e) when (e is OutOfMemoryException) { - if (e is OutOfMemoryException) - { - throw new InvalidDataException("Error caused because block size too big. Try decreasing block size.", e); - } - - throw; + throw new InvalidDataException("Error caused because block size is too big. Try decreasing block size.", e); } // Cache list for each active column From 6e4d46c62385acb90db29bf8efa72f44569eec5f Mon Sep 17 00:00:00 2001 From: Mandy Shieh Date: Thu, 10 May 2018 14:16:17 -0700 Subject: [PATCH 3/5] address comments --- src/Microsoft.ML.Parquet/ParquetLoader.cs | 47 +++++++++++++++-------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/src/Microsoft.ML.Parquet/ParquetLoader.cs b/src/Microsoft.ML.Parquet/ParquetLoader.cs index fa6465f998..a05212b7be 100644 --- a/src/Microsoft.ML.Parquet/ParquetLoader.cs +++ b/src/Microsoft.ML.Parquet/ParquetLoader.cs @@ -94,7 +94,8 @@ public sealed class Arguments private readonly int _columnChunkReadSize; private readonly Column[] _columnsLoaded; private readonly DataSet _schemaDataSet; - private const int _defaultColumnChunkReadSize = 100; // Should ideally be close to Rowgroup size + private const int _defaultColumnChunkReadSize = 1000000; + private const string _chunkSizeShortName = "chunkSize"; private bool _disposed; @@ -368,8 +369,8 @@ private sealed class Cursor : RootCursorBase, IRowCursor private readonly Delegate[] _getters; private readonly ReaderOptions _readerOptions; private int _curDataSetRow; - private IEnumerator _dataSetEnumerator; - private IEnumerator _blockEnumerator; + private IEnumerator _dataSetEnumerator; + private IEnumerator _blockEnumerator; private IList[] _columnValues; private IRandom _rand; @@ -390,18 +391,32 @@ public Cursor(ParquetLoader parent, Func predicate, IRandom rand) Columns = _loader._columnsLoaded.Select(i => i.Name).ToArray() }; + // The number of blocks is calculated based on the specified rows in a block (defaults to 1M). + // Since we want to shuffle the blocks in addition to shuffling the rows in each block, checks + // are put in place to ensure we can produce a shuffle order for the blocks. + int numBlocks; + int[] blockOrder; try { - int numBlocks = checked((int)Math.Ceiling(((decimal)parent.GetRowCount() / _readerOptions.Count))); - int[] blockOrder = _rand == null ? Utils.GetIdentityPermutation(numBlocks) : Utils.GetRandomPermutation(rand, numBlocks); - _blockEnumerator = blockOrder.GetEnumerator(); + numBlocks = checked((int)Math.Ceiling(((decimal)parent.GetRowCount() / _readerOptions.Count))); } - catch (Exception e) when (e is OutOfMemoryException || e is OverflowException) + catch (OverflowException) { - parent._host.Except(e, "Error due to too many blocks. Try increasing block size."); + // this exception is thrown when number of blocks exceeds int.MaxValue + throw _loader._host.ExceptParam("ColumnChunkReadSize", "Error due to too many blocks. Try increasing block size."); } + try + { + blockOrder = _rand == null ? Utils.GetIdentityPermutation(numBlocks) : Utils.GetRandomPermutation(rand, numBlocks); + } + catch (OutOfMemoryException) + { + // This exception is thrown when attempting to create an array of more than ~300M elements + throw _loader._host.ExceptParam(_chunkSizeShortName, "Error due to too many blocks. Try increasing block size."); + } + _blockEnumerator = blockOrder.Cast().GetEnumerator(); - _dataSetEnumerator = new int[0].GetEnumerator(); // Initialize an empty enumerator to get started + _dataSetEnumerator = new int[0].Cast().GetEnumerator(); // Initialize an empty enumerator to get started _columnValues = new IList[_actives.Length]; _getters = new Delegate[_actives.Length]; for (int i = 0; i < _actives.Length; ++i) @@ -479,7 +494,7 @@ protected override bool MoveNextCore() { if (_dataSetEnumerator.MoveNext()) { - _curDataSetRow = (int)_dataSetEnumerator.Current; + _curDataSetRow = _dataSetEnumerator.Current; return true; } else if (_blockEnumerator.MoveNext()) @@ -493,16 +508,18 @@ protected override bool MoveNextCore() ds = ParquetReader.Read(_loader._parquetStream, _loader._parquetOptions, _readerOptions); } + int[] dataSetOrder; try { - int[] dataSetOrder = _rand == null ? Utils.GetIdentityPermutation(ds.RowCount) : Utils.GetRandomPermutation(_rand, ds.RowCount); - _dataSetEnumerator = dataSetOrder.GetEnumerator(); - _curDataSetRow = dataSetOrder[0]; + dataSetOrder = _rand == null ? Utils.GetIdentityPermutation(ds.RowCount) : Utils.GetRandomPermutation(_rand, ds.RowCount); } - catch (Exception e) when (e is OutOfMemoryException) + catch (OutOfMemoryException) { - throw new InvalidDataException("Error caused because block size is too big. Try decreasing block size.", e); + // This exception will be thrown when trying to create an array that is too big. + throw _loader._host.ExceptParam(_chunkSizeShortName, "Error caused because block size is too big. Try decreasing block size."); } + _dataSetEnumerator = dataSetOrder.Cast().GetEnumerator(); + _curDataSetRow = dataSetOrder[0]; // Cache list for each active column for (int i = 0; i < _actives.Length; i++) From 9967d125ddbeeed1d0f23695dea750826555dbc6 Mon Sep 17 00:00:00 2001 From: Mandy Shieh Date: Mon, 14 May 2018 14:23:35 -0700 Subject: [PATCH 4/5] added ceiling to mathutils and have parquetloader default to sequential reading instead of throwing --- src/Microsoft.ML.Core/Utilities/MathUtils.cs | 11 ++++++ src/Microsoft.ML.Parquet/ParquetLoader.cs | 37 +++++++++----------- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/src/Microsoft.ML.Core/Utilities/MathUtils.cs b/src/Microsoft.ML.Core/Utilities/MathUtils.cs index e2848ea25d..059a266ac4 100644 --- a/src/Microsoft.ML.Core/Utilities/MathUtils.cs +++ b/src/Microsoft.ML.Core/Utilities/MathUtils.cs @@ -871,5 +871,16 @@ public static double Cos(double a) var res = Math.Cos(a); return Math.Abs(res) > 1 ? double.NaN : res; } + + /// + /// Returns the smallest integral value that is greater than or equal to the result of the division. + /// + /// Number to be divided. + /// Number with which to divide the numerator. + /// + public static long DivisionCeiling(long numerator, long denomenator) + { + return (numerator + denomenator - 1) / denomenator; + } } } diff --git a/src/Microsoft.ML.Parquet/ParquetLoader.cs b/src/Microsoft.ML.Parquet/ParquetLoader.cs index a05212b7be..4c94669193 100644 --- a/src/Microsoft.ML.Parquet/ParquetLoader.cs +++ b/src/Microsoft.ML.Parquet/ParquetLoader.cs @@ -95,7 +95,6 @@ public sealed class Arguments private readonly Column[] _columnsLoaded; private readonly DataSet _schemaDataSet; private const int _defaultColumnChunkReadSize = 1000000; - private const string _chunkSizeShortName = "chunkSize"; private bool _disposed; @@ -394,29 +393,25 @@ public Cursor(ParquetLoader parent, Func predicate, IRandom rand) // The number of blocks is calculated based on the specified rows in a block (defaults to 1M). // Since we want to shuffle the blocks in addition to shuffling the rows in each block, checks // are put in place to ensure we can produce a shuffle order for the blocks. - int numBlocks; - int[] blockOrder; - try - { - numBlocks = checked((int)Math.Ceiling(((decimal)parent.GetRowCount() / _readerOptions.Count))); - } - catch (OverflowException) + long numBlocks; + IEnumerable blockOrder; + numBlocks = MathUtils.DivisionCeiling((long)parent.GetRowCount(), _readerOptions.Count); + if (numBlocks > int.MaxValue) { - // this exception is thrown when number of blocks exceeds int.MaxValue - throw _loader._host.ExceptParam("ColumnChunkReadSize", "Error due to too many blocks. Try increasing block size."); + throw _loader._host.ExceptParam(nameof(Arguments.ColumnChunkReadSize), "Error due to too many blocks. Try increasing block size."); } try { - blockOrder = _rand == null ? Utils.GetIdentityPermutation(numBlocks) : Utils.GetRandomPermutation(rand, numBlocks); + blockOrder = _rand == null ? Enumerable.Range(0, (int)numBlocks) : Utils.GetRandomPermutation(rand, (int)numBlocks); } catch (OutOfMemoryException) { - // This exception is thrown when attempting to create an array of more than ~300M elements - throw _loader._host.ExceptParam(_chunkSizeShortName, "Error due to too many blocks. Try increasing block size."); + // if unable to create a shuffled sequence, default to sequential reading. + blockOrder = Enumerable.Range(0, (int)numBlocks); } - _blockEnumerator = blockOrder.Cast().GetEnumerator(); + _blockEnumerator = blockOrder.GetEnumerator(); - _dataSetEnumerator = new int[0].Cast().GetEnumerator(); // Initialize an empty enumerator to get started + _dataSetEnumerator = Enumerable.Empty().GetEnumerator(); _columnValues = new IList[_actives.Length]; _getters = new Delegate[_actives.Length]; for (int i = 0; i < _actives.Length; ++i) @@ -508,18 +503,18 @@ protected override bool MoveNextCore() ds = ParquetReader.Read(_loader._parquetStream, _loader._parquetOptions, _readerOptions); } - int[] dataSetOrder; + IEnumerable dataSetOrder; try { - dataSetOrder = _rand == null ? Utils.GetIdentityPermutation(ds.RowCount) : Utils.GetRandomPermutation(_rand, ds.RowCount); + dataSetOrder = _rand == null ? Enumerable.Range(0, ds.RowCount) : Utils.GetRandomPermutation(_rand, ds.RowCount); } catch (OutOfMemoryException) { - // This exception will be thrown when trying to create an array that is too big. - throw _loader._host.ExceptParam(_chunkSizeShortName, "Error caused because block size is too big. Try decreasing block size."); + // if unable to create a shuffled sequence, default to sequential reading. + dataSetOrder = Enumerable.Range(0, ds.RowCount); } - _dataSetEnumerator = dataSetOrder.Cast().GetEnumerator(); - _curDataSetRow = dataSetOrder[0]; + _dataSetEnumerator = dataSetOrder.GetEnumerator(); + _curDataSetRow = dataSetOrder.ElementAt(0); // Cache list for each active column for (int i = 0; i < _actives.Length; i++) From ec1e78a645a7ae699f38344a05153a8736ca0860 Mon Sep 17 00:00:00 2001 From: Mandy Shieh Date: Thu, 17 May 2018 12:55:00 -0700 Subject: [PATCH 5/5] factor out sequence creation and add check for overflow in MathUtils --- src/Microsoft.ML.Core/Utilities/MathUtils.cs | 2 +- src/Microsoft.ML.Parquet/ParquetLoader.cs | 45 ++++++++++---------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/src/Microsoft.ML.Core/Utilities/MathUtils.cs b/src/Microsoft.ML.Core/Utilities/MathUtils.cs index 059a266ac4..8106ff5a2c 100644 --- a/src/Microsoft.ML.Core/Utilities/MathUtils.cs +++ b/src/Microsoft.ML.Core/Utilities/MathUtils.cs @@ -880,7 +880,7 @@ public static double Cos(double a) /// public static long DivisionCeiling(long numerator, long denomenator) { - return (numerator + denomenator - 1) / denomenator; + return (checked(numerator + denomenator) - 1) / denomenator; } } } diff --git a/src/Microsoft.ML.Parquet/ParquetLoader.cs b/src/Microsoft.ML.Parquet/ParquetLoader.cs index 4c94669193..21271f6e5c 100644 --- a/src/Microsoft.ML.Parquet/ParquetLoader.cs +++ b/src/Microsoft.ML.Parquet/ParquetLoader.cs @@ -393,22 +393,12 @@ public Cursor(ParquetLoader parent, Func predicate, IRandom rand) // The number of blocks is calculated based on the specified rows in a block (defaults to 1M). // Since we want to shuffle the blocks in addition to shuffling the rows in each block, checks // are put in place to ensure we can produce a shuffle order for the blocks. - long numBlocks; - IEnumerable blockOrder; - numBlocks = MathUtils.DivisionCeiling((long)parent.GetRowCount(), _readerOptions.Count); + var numBlocks = MathUtils.DivisionCeiling((long)parent.GetRowCount(), _readerOptions.Count); if (numBlocks > int.MaxValue) { throw _loader._host.ExceptParam(nameof(Arguments.ColumnChunkReadSize), "Error due to too many blocks. Try increasing block size."); } - try - { - blockOrder = _rand == null ? Enumerable.Range(0, (int)numBlocks) : Utils.GetRandomPermutation(rand, (int)numBlocks); - } - catch (OutOfMemoryException) - { - // if unable to create a shuffled sequence, default to sequential reading. - blockOrder = Enumerable.Range(0, (int)numBlocks); - } + var blockOrder = CreateOrderSequence((int)numBlocks); _blockEnumerator = blockOrder.GetEnumerator(); _dataSetEnumerator = Enumerable.Empty().GetEnumerator(); @@ -503,16 +493,7 @@ protected override bool MoveNextCore() ds = ParquetReader.Read(_loader._parquetStream, _loader._parquetOptions, _readerOptions); } - IEnumerable dataSetOrder; - try - { - dataSetOrder = _rand == null ? Enumerable.Range(0, ds.RowCount) : Utils.GetRandomPermutation(_rand, ds.RowCount); - } - catch (OutOfMemoryException) - { - // if unable to create a shuffled sequence, default to sequential reading. - dataSetOrder = Enumerable.Range(0, ds.RowCount); - } + var dataSetOrder = CreateOrderSequence(ds.RowCount); _dataSetEnumerator = dataSetOrder.GetEnumerator(); _curDataSetRow = dataSetOrder.ElementAt(0); @@ -559,6 +540,26 @@ public bool IsColumnActive(int col) Ch.CheckParam(0 <= col && col < _colToActivesIndex.Length, nameof(col)); return _colToActivesIndex[col] >= 0; } + + /// + /// Creates a in-order or shuffled sequence, based on whether _rand is specified. + /// If unable to create a shuffle sequence, will default to sequential. + /// + /// Number of elements in the sequence. + /// + private IEnumerable CreateOrderSequence(int size) + { + IEnumerable order; + try + { + order = _rand == null ? Enumerable.Range(0, size) : Utils.GetRandomPermutation(_rand, size); + } + catch (OutOfMemoryException) + { + order = Enumerable.Range(0, size); + } + return order; + } } #region Dispose