From 7239f219deb418803826e7b2915ae6c9b1991621 Mon Sep 17 00:00:00 2001 From: "pieths.dev@gmail.com" Date: Thu, 29 Aug 2019 16:56:22 -0700 Subject: [PATCH 1/3] Initial implementation of csr_matrix output support. --- src/DotNetBridge/NativeDataInterop.cs | 360 +++++++++++++++++- src/DotNetBridge/RunGraph.cs | 11 +- src/NativeBridge/ManagedInterop.cpp | 2 +- src/python/nimbusml.pyproj | 1 + .../nimbusml/internal/utils/data_stream.py | 8 +- .../nimbusml/internal/utils/dataframes.py | 11 +- .../nimbusml/internal/utils/entrypoints.py | 49 ++- src/python/nimbusml/model_selection/cv.py | 4 +- src/python/nimbusml/pipeline.py | 34 +- .../nimbusml/tests/test_csr_matrix_output.py | 178 +++++++++ src/python/nimbusml/tests/test_entrypoints.py | 4 +- 11 files changed, 625 insertions(+), 37 deletions(-) create mode 100644 src/python/nimbusml/tests/test_csr_matrix_output.py diff --git a/src/DotNetBridge/NativeDataInterop.cs b/src/DotNetBridge/NativeDataInterop.cs index c9b70526..aca91038 100644 --- a/src/DotNetBridge/NativeDataInterop.cs +++ b/src/DotNetBridge/NativeDataInterop.cs @@ -93,7 +93,7 @@ public ColumnMetadataInfo(bool expand, string[] slotNames, Dictionary infos = null) + private static unsafe void SendViewToNativeAsDataFrame(IChannel ch, EnvironmentBlock* penv, IDataView view, Dictionary infos = null) { Contracts.AssertValue(ch); Contracts.Assert(penv != null); @@ -317,6 +317,135 @@ private static unsafe void SendViewToNative(IChannel ch, EnvironmentBlock* penv, } } + private static unsafe void SendViewToNativeAsCsr(IChannel ch, EnvironmentBlock* penv, IDataView view) + { + Contracts.AssertValue(ch); + Contracts.Assert(penv != null); + Contracts.AssertValue(view); + if (penv->dataSink == null) + { + // Environment doesn't want any data! + return; + } + + var dataSink = MarshalDelegate(penv->dataSink); + + var schema = view.Schema; + var colIndices = new List(); + var outputDataKind = InternalDataKind.R4; + + int numOutputRows = 0; + int numOutputCols = 0; + + for (int col = 0; col < schema.Count; col++) + { + if (schema[col].IsHidden) + continue; + + var fullType = schema[col].Type; + var itemType = fullType.GetItemType(); + int valueCount = fullType.GetValueCount(); + + if (valueCount == 0) + { + throw ch.ExceptNotSupp("Column has variable length vector: " + + schema[col].Name + ". Not supported in python. Drop column before sending to Python"); + } + + if (itemType.IsStandardScalar()) + { + switch (itemType.GetRawKind()) + { + default: + throw Contracts.Except("Data type {0} not supported", itemType.GetRawKind()); + + case InternalDataKind.I1: + case InternalDataKind.I2: + case InternalDataKind.U1: + case InternalDataKind.U2: + case InternalDataKind.R4: + break; + + case InternalDataKind.I4: + case InternalDataKind.U4: + case InternalDataKind.R8: + outputDataKind = InternalDataKind.R8; + break; + } + } + else + { + throw Contracts.Except("Data type {0} not supported", itemType.GetRawKind()); + } + + colIndices.Add(col); + numOutputCols += valueCount; + } + + var allNames = new HashSet(); + var nameIndices = new List(); + var nameUtf8Bytes = new List(); + + AddUniqueName("data", allNames, nameIndices, nameUtf8Bytes); + AddUniqueName("indices", allNames, nameIndices, nameUtf8Bytes); + AddUniqueName("indptr", allNames, nameIndices, nameUtf8Bytes); + AddUniqueName("shape", allNames, nameIndices, nameUtf8Bytes); + + var kindList = new List {outputDataKind, + InternalDataKind.I4, + InternalDataKind.I4, + InternalDataKind.I4}; + + var kinds = kindList.ToArray(); + var nameBytes = nameUtf8Bytes.ToArray(); + var names = new byte*[allNames.Count]; + + fixed (InternalDataKind* prgkind = kinds) + fixed (byte* prgbNames = nameBytes) + fixed (byte** prgname = names) + { + for (int iid = 0; iid < names.Length; iid++) + names[iid] = prgbNames + nameIndices[iid]; + + DataViewBlock block; + block.ccol = allNames.Count; + block.crow = view.GetRowCount() ?? 0; + block.names = (sbyte**)prgname; + block.kinds = prgkind; + block.keyCards = null; + + dataSink(penv, &block, out var setters, out var keyValueSetter); + + if (setters == null) return; + + using (var cursor = view.GetRowCursor(view.Schema.Where(col => colIndices.Contains(col.Index)))) + { + CsrData csrData = new CsrData(penv, setters, outputDataKind); + var fillers = new CsrFillerBase[colIndices.Count]; + + for (int i = 0; i < colIndices.Count; i++) + { + var type = schema[colIndices[i]].Type; + fillers[i] = CsrFillerBase.Create(penv, cursor, colIndices[i], type, outputDataKind, csrData); + } + + for (;; numOutputRows++) + { + if (!cursor.MoveNext()) break; + + for (int i = 0; i < fillers.Length; i++) + { + fillers[i].Set(); + } + + csrData.IncrementRow(); + } + + csrData.SetShape(numOutputRows, numOutputCols); + } + } + } + private static string AddUniqueName(string name, HashSet allNames, List nameIndices, List nameUtf8Bytes) { string newName = name; @@ -547,5 +676,234 @@ public override void Set() } } } + + private unsafe class CsrData + { + private const int DataCol = 0; + private const int IndicesCol = 1; + private const int IndPtrCol = 2; + private const int ShapeCol = 3; + + private readonly R4Setter _r4DataSetter; + private readonly R8Setter _r8DataSetter; + private readonly I4Setter _indicesSetter; + private readonly I4Setter _indptrSetter; + private readonly I4Setter _shapeSetter; + + public int col; + + private int _row; + private int _index; + + private EnvironmentBlock* _penv; + + public CsrData(EnvironmentBlock* penv, void** setters, InternalDataKind outputDataKind) + { + col = 0; + + _row = 0; + _index = 0; + _penv = penv; + + if (outputDataKind == InternalDataKind.R4) + { + _r4DataSetter = MarshalDelegate(setters[DataCol]); + _r8DataSetter = null; + } + else if(outputDataKind == InternalDataKind.R8) + { + _r4DataSetter = null; + _r8DataSetter = MarshalDelegate(setters[DataCol]); + } + + _indicesSetter = MarshalDelegate(setters[IndicesCol]); + _indptrSetter = MarshalDelegate(setters[IndPtrCol]); + _shapeSetter = MarshalDelegate(setters[ShapeCol]); + + _indptrSetter(_penv, IndPtrCol, 0, 0); + } + + public void AppendR4(float value, int col) + { + _r4DataSetter(_penv, DataCol, _index, value); + _indicesSetter(_penv, IndicesCol, _index, col); + _index++; + } + + public void AppendR8(double value, int col) + { + _r8DataSetter(_penv, DataCol, _index, value); + _indicesSetter(_penv, IndicesCol, _index, col); + _index++; + } + + public void IncrementRow() + { + col = 0; + _row++; + + _indptrSetter(_penv, IndPtrCol, _row, _index); + } + + public void SetShape(int m, int n) + { + _shapeSetter(_penv, ShapeCol, 0, m); + _shapeSetter(_penv, ShapeCol, 1, n); + } + } + + private abstract unsafe class CsrFillerBase + { + public delegate void DataAppender(T value, int col); + + protected CsrFillerBase() {} + + public static CsrFillerBase Create(EnvironmentBlock* penv, + DataViewRow input, + int idvCol, + DataViewType idvColType, + InternalDataKind outputDataKind, + CsrData csrData) + { + if (outputDataKind == InternalDataKind.R4) + { + switch (idvColType.GetItemType().GetRawKind()) + { + case InternalDataKind.I1: + DataAppender appendI1 = (sbyte val, int i) => csrData.AppendR4((float)val, i); + return new CsrFiller(input, idvCol, idvColType, appendI1, csrData); + case InternalDataKind.I2: + DataAppender appendI2 = (short val, int i) => csrData.AppendR4((float)val, i); + return new CsrFiller(input, idvCol, idvColType, appendI2, csrData); + case InternalDataKind.U1: + DataAppender appendU1 = (byte val, int i) => csrData.AppendR4((float)val, i); + return new CsrFiller(input, idvCol, idvColType, appendU1, csrData); + case InternalDataKind.U2: + DataAppender appendU2 = (ushort val, int i) => csrData.AppendR4((float)val, i); + return new CsrFiller(input, idvCol, idvColType, appendU2, csrData); + case InternalDataKind.R4: + DataAppender appendR4 = (float val, int i) => csrData.AppendR4((float)val, i); + return new CsrFiller(input, idvCol, idvColType, appendR4, csrData); + default: + throw Contracts.Except("Source data type not supported"); + } + } + else if (outputDataKind == InternalDataKind.R8) + { + switch (idvColType.GetItemType().GetRawKind()) + { + case InternalDataKind.I1: + DataAppender appendI1 = (sbyte val, int i) => csrData.AppendR8((double)val, i); + return new CsrFiller(input, idvCol, idvColType, appendI1, csrData); + case InternalDataKind.I2: + DataAppender appendI2 = (short val, int i) => csrData.AppendR8((double)val, i); + return new CsrFiller(input, idvCol, idvColType, appendI2, csrData); + case InternalDataKind.I4: + DataAppender appendI4 = (int val, int i) => csrData.AppendR8((double)val, i); + return new CsrFiller(input, idvCol, idvColType, appendI4, csrData); + case InternalDataKind.U1: + DataAppender appendU1 = (byte val, int i) => csrData.AppendR8((double)val, i); + return new CsrFiller(input, idvCol, idvColType, appendU1, csrData); + case InternalDataKind.U2: + DataAppender appendU2 = (ushort val, int i) => csrData.AppendR8((double)val, i); + return new CsrFiller(input, idvCol, idvColType, appendU2, csrData); + case InternalDataKind.U4: + DataAppender appendU4 = (uint val, int i) => csrData.AppendR8((double)val, i); + return new CsrFiller(input, idvCol, idvColType, appendU4, csrData); + case InternalDataKind.R4: + DataAppender appendR4 = (float val, int i) => csrData.AppendR8((double)val, i); + return new CsrFiller(input, idvCol, idvColType, appendR4, csrData); + case InternalDataKind.R8: + DataAppender appendR8 = (double val, int i) => csrData.AppendR8((double)val, i); + return new CsrFiller(input, idvCol, idvColType, appendR8, csrData); + default: + throw Contracts.Except("Source data type not supported"); + } + } + + throw Contracts.Except("Target data type not supported."); + } + + public abstract void Set(); + + private sealed class CsrFiller : CsrFillerBase + { + private readonly ValueGetter> _getVec; + private readonly ValueGetter _get; + private VBuffer _buffer; + + private CsrData _csrData; + private readonly DataAppender _dataAppender; + + private readonly IEqualityComparer comparer = EqualityComparer.Default; + + public CsrFiller(DataViewRow input, + int idvColIndex, + DataViewType type, + DataAppender dataAppender, + CsrData csrData) + : base() + { + Contracts.AssertValue(input); + Contracts.Assert(0 <= idvColIndex && idvColIndex < input.Schema.Count); + + if (type is VectorDataViewType) + _getVec = RowCursorUtils.GetVecGetterAs((PrimitiveDataViewType)type.GetItemType(), input, idvColIndex); + else + _get = RowCursorUtils.GetGetterAs(type, input, idvColIndex); + + _csrData = csrData; + _dataAppender = dataAppender; + } + + public bool IsDefault(TSrc t) + { + return comparer.Equals(t, default(TSrc)); + } + + public override void Set() + { + if (_getVec != null) + { + _getVec(ref _buffer); + if (_buffer.IsDense) + { + var values = _buffer.GetValues(); + + for (int i = 0; i < values.Length; i++) + { + if (!IsDefault(values[i])) + _dataAppender(values[i], _csrData.col); + + _csrData.col++; + } + } + else + { + var values = _buffer.GetValues(); + var indices = _buffer.GetIndices(); + + for (int i = 0; i < values.Length; i++) + { + if (!IsDefault(values[i])) + _dataAppender(values[i], _csrData.col + indices[i]); + } + + _csrData.col += _buffer.Length; + } + } + else + { + TSrc value = default(TSrc); + _get(ref value); + + if (!IsDefault(value)) + _dataAppender(value, _csrData.col); + + _csrData.col++; + } + } + } + } } } diff --git a/src/DotNetBridge/RunGraph.cs b/src/DotNetBridge/RunGraph.cs index 97c39730..932a8aa6 100644 --- a/src/DotNetBridge/RunGraph.cs +++ b/src/DotNetBridge/RunGraph.cs @@ -27,6 +27,9 @@ public unsafe static partial class Bridge // std:null specifier in a graph, used to redirect output to std::null const string STDNULL = ""; + // graph output format specifier, used to output to a sparse csr matrix + const string CSR_MATRIX = ""; + private static void SaveIdvToFile(IDataView idv, string path, IHost host) { if (path == STDNULL) @@ -183,14 +186,18 @@ private static void RunGraphCore(EnvironmentBlock* penv, IHostEnvironment env, s throw host.ExceptNotSupp("File handle outputs not yet supported."); case TlcModule.DataKind.DataView: var idv = runner.GetOutput(varName); - if (!string.IsNullOrWhiteSpace(path)) + if (path == CSR_MATRIX) + { + SendViewToNativeAsCsr(ch, penv, idv); + } + else if (!string.IsNullOrWhiteSpace(path)) { SaveIdvToFile(idv, path, host); } else { var infos = ProcessColumns(ref idv, penv->maxSlots, host); - SendViewToNative(ch, penv, idv, infos); + SendViewToNativeAsDataFrame(ch, penv, idv, infos); } break; case TlcModule.DataKind.PredictorModel: diff --git a/src/NativeBridge/ManagedInterop.cpp b/src/NativeBridge/ManagedInterop.cpp index a8ed7941..e31ea6e3 100644 --- a/src/NativeBridge/ManagedInterop.cpp +++ b/src/NativeBridge/ManagedInterop.cpp @@ -162,7 +162,7 @@ void EnvironmentBlock::DataSinkCore(const DataViewBlock * pdata) throw std::invalid_argument("data type is not supported " + std::to_string(kind)); } - if (pdata->keyCards[i] >= 0) + if (pdata->keyCards && (pdata->keyCards[i] >= 0)) { _vKeyValues.push_back(new PythonObject(TX, pdata->keyCards[i], 1)); _columnToKeyMap.push_back(numKeys++); diff --git a/src/python/nimbusml.pyproj b/src/python/nimbusml.pyproj index 5daea049..f1451167 100644 --- a/src/python/nimbusml.pyproj +++ b/src/python/nimbusml.pyproj @@ -669,6 +669,7 @@ + diff --git a/src/python/nimbusml/internal/utils/data_stream.py b/src/python/nimbusml/internal/utils/data_stream.py index 7d490bc6..5f92b391 100644 --- a/src/python/nimbusml/internal/utils/data_stream.py +++ b/src/python/nimbusml/internal/utils/data_stream.py @@ -419,7 +419,7 @@ def to_df(self): # Do not move these imports or the module fails # due to circular references. from ..entrypoints.transforms_nooperation import transforms_nooperation - from .entrypoints import Graph + from .entrypoints import Graph, DataOutputFormat no_op = transforms_nooperation( data='$data', output_data='$output_data') @@ -427,7 +427,7 @@ def to_df(self): graph = Graph( dict( data=''), dict( - output_data=''), False, *(graph_nodes)) + output_data=''), DataOutputFormat.Default, *(graph_nodes)) (out_model, out_data, out_metrics) = graph.run(verbose=True, X=self) return out_data @@ -438,7 +438,7 @@ def head(self, n=5, skip=0): transforms_rowtakefilter from ..entrypoints.transforms_rowskipfilter import \ transforms_rowskipfilter - from .entrypoints import Graph + from .entrypoints import Graph, DataOutputFormat if n == 0: raise ValueError("n must be > 0") graph_nodes = [] @@ -456,7 +456,7 @@ def head(self, n=5, skip=0): graph = Graph( dict( data=''), dict( - output_data=''), False, *(graph_nodes)) + output_data=''), DataOutputFormat.Default, *(graph_nodes)) (out_model, out_data, out_metrics) = graph.run(verbose=True, X=self) return out_data diff --git a/src/python/nimbusml/internal/utils/dataframes.py b/src/python/nimbusml/internal/utils/dataframes.py index fe46ac20..ea19587c 100644 --- a/src/python/nimbusml/internal/utils/dataframes.py +++ b/src/python/nimbusml/internal/utils/dataframes.py @@ -201,7 +201,7 @@ def get_obj(el): return concat(els, axis=axis, join=join) -def resolve_output(ret): +def resolve_output_as_dataframe(ret): data = dict() for key in ret.keys(): if not isinstance(ret[key], dict): @@ -212,6 +212,15 @@ def resolve_output(ret): return DataFrame(data) +def resolve_output_as_csrmatrix(ret): + matrix = csr_matrix((1, 1)) + + if all([k in ret for k in ('data', 'indices', 'indptr', 'shape')]): + matrix = csr_matrix((ret['data'], ret['indices'], ret['indptr']), + shape=(ret['shape'][0], ret['shape'][1])) + return matrix + + # Any changes to this dictionary must also be done in the enum # ML_PY_TYPE_MAP_ENUM defined in DataViewInterop.h. _global_dtype_to_char_dict = { diff --git a/src/python/nimbusml/internal/utils/entrypoints.py b/src/python/nimbusml/internal/utils/entrypoints.py index 09a808ce..9479479b 100644 --- a/src/python/nimbusml/internal/utils/entrypoints.py +++ b/src/python/nimbusml/internal/utils/entrypoints.py @@ -23,7 +23,7 @@ from .data_stream import BinaryDataStream from .data_stream import FileDataStream from .dataframes import resolve_dataframe, resolve_csr_matrix, pd_concat, \ - resolve_output + resolve_output_as_dataframe, resolve_output_as_csrmatrix from .utils import try_set, set_clr_environment_vars, get_clr_path, \ get_mlnet_path, get_dprep_path from ..libs.pybridge import px_call @@ -141,6 +141,12 @@ def _get_temp_file(suffix=None): return file_name +class DataOutputFormat(Enum): + BinaryDataStream= 'binary_data_stream' + CsrMatrix = 'csr_matrix' + Default = 'default' + + class Graph(EntryPoint): """ graph @@ -166,7 +172,7 @@ def __init__( self, inputs=None, outputs=None, - output_binary_data_stream=False, + data_output_format=DataOutputFormat.Default, *nodes): Graph._check_nodes(nodes) @@ -191,7 +197,7 @@ def __init__( self.nodes = nodes self._write_csv_time = 0 - self._output_binary_data_stream = output_binary_data_stream + self._data_output_format = data_output_format def __iter__(self): return iter(self.nodes) @@ -387,10 +393,13 @@ def remove_multi_level_index(c): output_metricsfilename = _get_temp_file(suffix='.txt') self.outputs['output_metrics'] = output_metricsfilename - if 'output_data' in self.outputs and \ - self._output_binary_data_stream: - output_idvfilename = _get_temp_file(suffix='.idv') - self.outputs['output_data'] = output_idvfilename + if 'output_data' in self.outputs: + if self._data_output_format == DataOutputFormat.BinaryDataStream: + output_idvfilename = _get_temp_file(suffix='.idv') + self.outputs['output_data'] = output_idvfilename + + elif self._data_output_format == DataOutputFormat.CsrMatrix: + self.outputs['output_data'] = "" # set graph file for debuggings if verbose > 0: @@ -425,15 +434,21 @@ def remove_multi_level_index(c): concatenated, output_modelfilename) - out_data = resolve_output(ret) - # remove label column from data - if out_data is not None and concatenated: - out_columns = list(out_data.columns) - if hasattr(y, 'columns'): - y_column = y.columns[0] - if y_column in out_columns: - out_columns.remove(y_column) - out_data = out_data[out_columns] + out_data = None + + if not cv and self._data_output_format == DataOutputFormat.CsrMatrix: + out_data = resolve_output_as_csrmatrix(ret) + else: + out_data = resolve_output_as_dataframe(ret) + # remove label column from data + if out_data is not None and concatenated: + out_columns = list(out_data.columns) + if hasattr(y, 'columns'): + y_column = y.columns[0] + if y_column in out_columns: + out_columns.remove(y_column) + out_data = out_data[out_columns] + if output_metricsfilename: out_metrics = pd.read_csv( output_metricsfilename, @@ -446,7 +461,7 @@ def remove_multi_level_index(c): if cv: return self._process_graph_run_results(out_data) - elif self._output_binary_data_stream: + elif self._data_output_format == DataOutputFormat.BinaryDataStream: output = BinaryDataStream(output_idvfilename) return (output_modelfilename, output, out_metrics) else: diff --git a/src/python/nimbusml/model_selection/cv.py b/src/python/nimbusml/model_selection/cv.py index 8a3d1a57..d992399d 100644 --- a/src/python/nimbusml/model_selection/cv.py +++ b/src/python/nimbusml/model_selection/cv.py @@ -17,7 +17,7 @@ transforms_manyheterogeneousmodelcombiner from ..internal.entrypoints.transforms_modelcombiner import \ transforms_modelcombiner -from ..internal.utils.entrypoints import Graph, GraphOutputType +from ..internal.utils.entrypoints import Graph, GraphOutputType, DataOutputFormat # Extension method for extending a list of steps, with chaining @@ -544,7 +544,7 @@ def fit( group_column=group_id) steps.add(cv_node) - graph = Graph(cv_aux_info.inputs, self.outputs, False, *steps) + graph = Graph(cv_aux_info.inputs, self.outputs, DataOutputFormat.Default, *steps) # prepare telemetry info class_name = type(self).__name__ diff --git a/src/python/nimbusml/pipeline.py b/src/python/nimbusml/pipeline.py index 4824e83c..6748a287 100644 --- a/src/python/nimbusml/pipeline.py +++ b/src/python/nimbusml/pipeline.py @@ -70,7 +70,7 @@ from .internal.utils.data_schema import DataSchema from .internal.utils.data_stream import DataStream, ViewDataStream, \ FileDataStream, BinaryDataStream -from .internal.utils.entrypoints import Graph +from .internal.utils.entrypoints import Graph, DataOutputFormat from .internal.utils.schema_helper import _extract_label_column from .internal.utils.utils import trace, unlist @@ -824,10 +824,18 @@ def _fit_graph(self, X, y, verbose, **params): if learner_node is None: # last node is transformer outputs[output_data.replace( '$', '')] = '' if do_fit_transform else '' + + data_output_format = DataOutputFormat.Default + if do_fit_transform: + if output_binary_data_stream: + data_output_format = DataOutputFormat.BinaryDataStream + elif params.pop('as_csr', False): + data_output_format = DataOutputFormat.CsrMatrix + graph = Graph( inputs, outputs, - do_fit_transform and output_binary_data_stream, + data_output_format, *(graph_nodes)) # Checks that every parameter in params was used. @@ -1774,10 +1782,13 @@ def get_feature_contributions(self, X, top=10, bottom=10, verbose=0, outputs = dict(output_data="") + data_output_format = DataOutputFormat.BinaryDataStream if as_binary_data_stream \ + else DataOutputFormat.Default, + graph = Graph( inputs, outputs, - as_binary_data_stream, + data_output_format, *all_nodes) class_name = type(self).__name__ @@ -1903,10 +1914,13 @@ def _predict(self, X, y=None, else: outputs = dict(output_data="") + data_output_format = DataOutputFormat.BinaryDataStream if as_binary_data_stream \ + else DataOutputFormat.Default, + graph = Graph( inputs, outputs, - as_binary_data_stream, + data_output_format, *all_nodes) class_name = type(self).__name__ @@ -2241,10 +2255,16 @@ def transform( all_nodes.extend([apply_node]) + data_output_format = DataOutputFormat.Default + if as_binary_data_stream: + data_output_format = DataOutputFormat.BinaryDataStream + elif params.pop('as_csr', False): + data_output_format = DataOutputFormat.CsrMatrix + graph = Graph( inputs, dict(output_data=""), - as_binary_data_stream, + data_output_format, *all_nodes) class_name = type(self).__name__ @@ -2316,7 +2336,7 @@ def summary(self, verbose=0, **params): graph = Graph( inputs, outputs, - False, + DataOutputFormat.Default, *all_nodes) class_name = type(self).__name__ @@ -2566,7 +2586,7 @@ def combine_models(cls, *items, **params): graph = Graph( inputs, outputs, - False, + DataOutputFormat.Default, *nodes) class_name = cls.__name__ diff --git a/src/python/nimbusml/tests/test_csr_matrix_output.py b/src/python/nimbusml/tests/test_csr_matrix_output.py new file mode 100644 index 00000000..dab95f1d --- /dev/null +++ b/src/python/nimbusml/tests/test_csr_matrix_output.py @@ -0,0 +1,178 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------------------------- + +import unittest + +import numpy as np +import pandas as pd +from nimbusml import Pipeline +from nimbusml.feature_extraction.categorical import OneHotVectorizer +from nimbusml.preprocessing.schema import ColumnConcatenator, ColumnDropper +from scipy.sparse import csr_matrix + + +class TestCsrMatrixOutput(unittest.TestCase): + + def test_column_dropped_output_produces_expected_result(self): + train_data = {'c1': [1, 0, 0, 4], + 'c2': [2, 3, 0, 5], + 'c3': [3, 4, 5, 6]} + train_df = pd.DataFrame(train_data).astype(np.float32) + + xf = ColumnDropper(columns=['c3']) + xf.fit(train_df) + result = xf.transform(train_df, as_csr=True) + + self.assertEqual(result.nnz, 5) + self.assertTrue(type(result) == csr_matrix) + result = pd.DataFrame(result.todense()) + + train_data = {0: [1, 0, 0, 4], + 1: [2, 3, 0, 5]} + expected_result = pd.DataFrame(train_data).astype(np.float32) + + self.assertTrue(result.equals(expected_result)) + + def test_fit_transform_produces_expected_result(self): + train_data = {'c1': [1, 0, 0, 4], + 'c2': [2, 3, 0, 5], + 'c3': [3, 4, 5, 6]} + train_df = pd.DataFrame(train_data).astype(np.float32) + + xf = ColumnDropper(columns=['c3']) + result = xf.fit_transform(train_df, as_csr=True) + + self.assertEqual(result.nnz, 5) + self.assertTrue(type(result) == csr_matrix) + result = pd.DataFrame(result.todense()) + + train_data = {0: [1, 0, 0, 4], + 1: [2, 3, 0, 5]} + expected_result = pd.DataFrame(train_data).astype(np.float32) + + self.assertTrue(result.equals(expected_result)) + + def test_vector_column_combined_with_single_value_columns(self): + train_data = {'c1': [1, 0, 0, 4], + 'c2': [2, 3, 0, 5], + 'c3': [3, 4, 5, 6]} + train_df = pd.DataFrame(train_data).astype(np.float32) + + xf = ColumnConcatenator(columns={'features': ['c1', 'c2', 'c3']}) + xf.fit(train_df) + result = xf.transform(train_df, as_csr=True) + + self.assertEqual(result.nnz, 18) + self.assertTrue(type(result) == csr_matrix) + result = pd.DataFrame(result.todense()) + + train_data = {0: [1, 0, 0, 4], + 1: [2, 3, 0, 5], + 2: [3, 4, 5, 6], + 3: [1, 0, 0, 4], + 4: [2, 3, 0, 5], + 5: [3, 4, 5, 6]} + expected_result = pd.DataFrame(train_data).astype(np.float32) + self.assertTrue(result.equals(expected_result)) + + def test_sparse_vector_column(self): + train_data = {'c0': ['a', 'b', 'a', 'b'], + 'c1': ['c', 'd', 'd', 'c']} + train_df = pd.DataFrame(train_data) + + xf = OneHotVectorizer(columns={'c0':'c0', 'c1':'c1'}) + xf.fit(train_df) + expected_result = xf.transform(train_df) + self.assertTrue(type(expected_result) == pd.DataFrame) + + result = xf.transform(train_df, as_csr=True) + self.assertEqual(result.nnz, 8) + self.assertTrue(type(result) == csr_matrix) + + result = pd.DataFrame(result.todense(), columns=['c0.a', 'c0.b', 'c1.c', 'c1.d']) + + self.assertTrue(result.equals(expected_result)) + + def test_sparse_vector_column_combined_with_single_value_columns(self): + train_data = {'c0': [0, 1, 0, 3], + 'c1': ['a', 'b', 'a', 'b']} + train_df = pd.DataFrame(train_data).astype({'c0': np.float32}) + + xf = OneHotVectorizer(columns={'c1':'c1'}) + xf.fit(train_df) + expected_result = xf.transform(train_df) + self.assertTrue(type(expected_result) == pd.DataFrame) + + result = xf.transform(train_df, as_csr=True) + self.assertEqual(result.nnz, 6) + self.assertTrue(type(result) == csr_matrix) + + result = pd.DataFrame(result.todense(), columns=['c0', 'c1.a', 'c1.b']) + + self.assertTrue(result.equals(expected_result)) + + def test_types_convertable_to_r4_get_output_as_r4(self): + train_data = {'c1': [1, 0, 0, 4], + 'c2': [2, 3, 0, 5], + 'c3': [3, 4, 5, 6], + 'c4': [4, 5, 6, 7]} + train_df = pd.DataFrame(train_data).astype({'c1': np.ubyte, + 'c2': np.short, + 'c3': np.float32}) + + xf = ColumnDropper(columns=['c4']) + xf.fit(train_df) + result = xf.transform(train_df, as_csr=True) + + self.assertTrue(type(result) == csr_matrix) + self.assertEqual(result.nnz, 9) + result = pd.DataFrame(result.todense()) + + train_data = {0: [1, 0, 0, 4], + 1: [2, 3, 0, 5], + 2: [3, 4, 5, 6]} + expected_result = pd.DataFrame(train_data).astype(np.float32) + + self.assertTrue(result.equals(expected_result)) + + self.assertEqual(result.dtypes[0], np.float32) + self.assertEqual(result.dtypes[1], np.float32) + self.assertEqual(result.dtypes[2], np.float32) + + def test_types_convertable_to_r8_get_output_as_r8(self): + train_data = {'c1': [1, 0, 0, 4], + 'c2': [2, 3, 0, 5], + 'c3': [3, 0, 5, 0], + 'c4': [0, 5, 6, 7], + 'c5': [5, 6, 7, 8]} + train_df = pd.DataFrame(train_data).astype({'c1': np.ubyte, + 'c2': np.short, + 'c3': np.float32, + 'c4': np.float64}) + + xf = ColumnDropper(columns=['c5']) + xf.fit(train_df) + result = xf.transform(train_df, as_csr=True) + + self.assertTrue(type(result) == csr_matrix) + self.assertEqual(result.nnz, 10) + result = pd.DataFrame(result.todense()) + + train_data = {0: [1, 0, 0, 4], + 1: [2, 3, 0, 5], + 2: [3, 0, 5, 0], + 3: [0, 5, 6, 7]} + expected_result = pd.DataFrame(train_data).astype(np.float64) + + self.assertTrue(result.equals(expected_result)) + + self.assertEqual(result.dtypes[0], np.float64) + self.assertEqual(result.dtypes[1], np.float64) + self.assertEqual(result.dtypes[2], np.float64) + self.assertEqual(result.dtypes[3], np.float64) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/nimbusml/tests/test_entrypoints.py b/src/python/nimbusml/tests/test_entrypoints.py index 257d5bef..ed36efbb 100644 --- a/src/python/nimbusml/tests/test_entrypoints.py +++ b/src/python/nimbusml/tests/test_entrypoints.py @@ -11,7 +11,7 @@ from nimbusml.internal.entrypoints.transforms_twoheterogeneousmodelcombiner \ import \ transforms_twoheterogeneousmodelcombiner -from nimbusml.internal.utils.entrypoints import EntryPoint, Graph +from nimbusml.internal.utils.entrypoints import EntryPoint, Graph, DataOutputFormat # from imp import reload @@ -116,7 +116,7 @@ def test_logistic_regression_graph(self): graph = Graph( dict( input_data=""), dict( - output_model=""), False, *all_nodes) + output_model=""), DataOutputFormat.Default, *all_nodes) # print(graph) graph.run(X=None, dryrun=True) From ef65cdb5bddc57624784eaf3cade7b4578f73d1c Mon Sep 17 00:00:00 2001 From: "pieths.dev@gmail.com" Date: Thu, 29 Aug 2019 17:27:18 -0700 Subject: [PATCH 2/3] Whitespace change to kick off another build. The CentOs test run crashed. --- src/python/nimbusml/tests/test_csr_matrix_output.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/nimbusml/tests/test_csr_matrix_output.py b/src/python/nimbusml/tests/test_csr_matrix_output.py index dab95f1d..37ef6180 100644 --- a/src/python/nimbusml/tests/test_csr_matrix_output.py +++ b/src/python/nimbusml/tests/test_csr_matrix_output.py @@ -10,7 +10,7 @@ from nimbusml import Pipeline from nimbusml.feature_extraction.categorical import OneHotVectorizer from nimbusml.preprocessing.schema import ColumnConcatenator, ColumnDropper -from scipy.sparse import csr_matrix +from scipy.sparse import csr_matrix class TestCsrMatrixOutput(unittest.TestCase): From cc64a3375ca810626046c25422c2e5eb20008a81 Mon Sep 17 00:00:00 2001 From: Gani Nazirov Date: Mon, 9 Sep 2019 13:30:16 -0700 Subject: [PATCH 3/3] Rename as per comment --- .../nimbusml/internal/utils/data_stream.py | 6 ++--- .../nimbusml/internal/utils/entrypoints.py | 19 ++++++++------- src/python/nimbusml/model_selection/cv.py | 2 +- src/python/nimbusml/pipeline.py | 24 +++++++++---------- src/python/nimbusml/tests/test_entrypoints.py | 2 +- 5 files changed, 28 insertions(+), 25 deletions(-) diff --git a/src/python/nimbusml/internal/utils/data_stream.py b/src/python/nimbusml/internal/utils/data_stream.py index 5f92b391..675ba148 100644 --- a/src/python/nimbusml/internal/utils/data_stream.py +++ b/src/python/nimbusml/internal/utils/data_stream.py @@ -399,7 +399,7 @@ def __init__(self, parent, columns): class BinaryDataStream(DataStream): """ - Defines a data view. + Data accessor for IDV data format, see here https://github.com/dotnet/machinelearning/blob/master/docs/code/IDataViewImplementation.md """ def __init__(self, filename): @@ -427,7 +427,7 @@ def to_df(self): graph = Graph( dict( data=''), dict( - output_data=''), DataOutputFormat.Default, *(graph_nodes)) + output_data=''), DataOutputFormat.DF, *(graph_nodes)) (out_model, out_data, out_metrics) = graph.run(verbose=True, X=self) return out_data @@ -456,7 +456,7 @@ def head(self, n=5, skip=0): graph = Graph( dict( data=''), dict( - output_data=''), DataOutputFormat.Default, *(graph_nodes)) + output_data=''), DataOutputFormat.DF, *(graph_nodes)) (out_model, out_data, out_metrics) = graph.run(verbose=True, X=self) return out_data diff --git a/src/python/nimbusml/internal/utils/entrypoints.py b/src/python/nimbusml/internal/utils/entrypoints.py index 9479479b..28127b1d 100644 --- a/src/python/nimbusml/internal/utils/entrypoints.py +++ b/src/python/nimbusml/internal/utils/entrypoints.py @@ -142,9 +142,12 @@ def _get_temp_file(suffix=None): class DataOutputFormat(Enum): - BinaryDataStream= 'binary_data_stream' - CsrMatrix = 'csr_matrix' - Default = 'default' + # Regular pandas dataframe format + DF = 0 + # IDV data format, see here https://github.com/dotnet/machinelearning/blob/master/docs/code/IDataViewImplementation.md + IDV = 1 + # csr_matrix sparse data format + CSR = 2 class Graph(EntryPoint): @@ -172,7 +175,7 @@ def __init__( self, inputs=None, outputs=None, - data_output_format=DataOutputFormat.Default, + data_output_format=DataOutputFormat.DF, *nodes): Graph._check_nodes(nodes) @@ -394,11 +397,11 @@ def remove_multi_level_index(c): self.outputs['output_metrics'] = output_metricsfilename if 'output_data' in self.outputs: - if self._data_output_format == DataOutputFormat.BinaryDataStream: + if self._data_output_format == DataOutputFormat.IDV: output_idvfilename = _get_temp_file(suffix='.idv') self.outputs['output_data'] = output_idvfilename - elif self._data_output_format == DataOutputFormat.CsrMatrix: + elif self._data_output_format == DataOutputFormat.CSR: self.outputs['output_data'] = "" # set graph file for debuggings @@ -436,7 +439,7 @@ def remove_multi_level_index(c): out_data = None - if not cv and self._data_output_format == DataOutputFormat.CsrMatrix: + if not cv and self._data_output_format == DataOutputFormat.CSR: out_data = resolve_output_as_csrmatrix(ret) else: out_data = resolve_output_as_dataframe(ret) @@ -461,7 +464,7 @@ def remove_multi_level_index(c): if cv: return self._process_graph_run_results(out_data) - elif self._data_output_format == DataOutputFormat.BinaryDataStream: + elif self._data_output_format == DataOutputFormat.IDV: output = BinaryDataStream(output_idvfilename) return (output_modelfilename, output, out_metrics) else: diff --git a/src/python/nimbusml/model_selection/cv.py b/src/python/nimbusml/model_selection/cv.py index d992399d..79a5def4 100644 --- a/src/python/nimbusml/model_selection/cv.py +++ b/src/python/nimbusml/model_selection/cv.py @@ -544,7 +544,7 @@ def fit( group_column=group_id) steps.add(cv_node) - graph = Graph(cv_aux_info.inputs, self.outputs, DataOutputFormat.Default, *steps) + graph = Graph(cv_aux_info.inputs, self.outputs, DataOutputFormat.DF, *steps) # prepare telemetry info class_name = type(self).__name__ diff --git a/src/python/nimbusml/pipeline.py b/src/python/nimbusml/pipeline.py index 6748a287..ade5744e 100644 --- a/src/python/nimbusml/pipeline.py +++ b/src/python/nimbusml/pipeline.py @@ -825,12 +825,12 @@ def _fit_graph(self, X, y, verbose, **params): outputs[output_data.replace( '$', '')] = '' if do_fit_transform else '' - data_output_format = DataOutputFormat.Default + data_output_format = DataOutputFormat.DF if do_fit_transform: if output_binary_data_stream: - data_output_format = DataOutputFormat.BinaryDataStream + data_output_format = DataOutputFormat.IDV elif params.pop('as_csr', False): - data_output_format = DataOutputFormat.CsrMatrix + data_output_format = DataOutputFormat.CSR graph = Graph( inputs, @@ -1782,8 +1782,8 @@ def get_feature_contributions(self, X, top=10, bottom=10, verbose=0, outputs = dict(output_data="") - data_output_format = DataOutputFormat.BinaryDataStream if as_binary_data_stream \ - else DataOutputFormat.Default, + data_output_format = DataOutputFormat.IDV if as_binary_data_stream \ + else DataOutputFormat.DF, graph = Graph( inputs, @@ -1914,8 +1914,8 @@ def _predict(self, X, y=None, else: outputs = dict(output_data="") - data_output_format = DataOutputFormat.BinaryDataStream if as_binary_data_stream \ - else DataOutputFormat.Default, + data_output_format = DataOutputFormat.IDV if as_binary_data_stream \ + else DataOutputFormat.DF, graph = Graph( inputs, @@ -2255,11 +2255,11 @@ def transform( all_nodes.extend([apply_node]) - data_output_format = DataOutputFormat.Default + data_output_format = DataOutputFormat.DF if as_binary_data_stream: - data_output_format = DataOutputFormat.BinaryDataStream + data_output_format = DataOutputFormat.IDV elif params.pop('as_csr', False): - data_output_format = DataOutputFormat.CsrMatrix + data_output_format = DataOutputFormat.CSR graph = Graph( inputs, @@ -2336,7 +2336,7 @@ def summary(self, verbose=0, **params): graph = Graph( inputs, outputs, - DataOutputFormat.Default, + DataOutputFormat.DF, *all_nodes) class_name = type(self).__name__ @@ -2586,7 +2586,7 @@ def combine_models(cls, *items, **params): graph = Graph( inputs, outputs, - DataOutputFormat.Default, + DataOutputFormat.DF, *nodes) class_name = cls.__name__ diff --git a/src/python/nimbusml/tests/test_entrypoints.py b/src/python/nimbusml/tests/test_entrypoints.py index ed36efbb..c4e53546 100644 --- a/src/python/nimbusml/tests/test_entrypoints.py +++ b/src/python/nimbusml/tests/test_entrypoints.py @@ -116,7 +116,7 @@ def test_logistic_regression_graph(self): graph = Graph( dict( input_data=""), dict( - output_model=""), DataOutputFormat.Default, *all_nodes) + output_model=""), DataOutputFormat.DF, *all_nodes) # print(graph) graph.run(X=None, dryrun=True)