diff --git a/src/DotNetBridge/Bridge.cs b/src/DotNetBridge/Bridge.cs index 8ae167f5..0aeb27a6 100644 --- a/src/DotNetBridge/Bridge.cs +++ b/src/DotNetBridge/Bridge.cs @@ -7,10 +7,8 @@ using System.Runtime.InteropServices; using System.Text; using System.Threading; -using Microsoft.ML; using Microsoft.ML.Data; using Microsoft.ML.EntryPoints; -using Microsoft.ML.Model.OnnxConverter; using Microsoft.ML.Runtime; using Microsoft.ML.Trainers; using Microsoft.ML.Trainers.Ensemble; @@ -19,7 +17,7 @@ using Microsoft.ML.Transforms; using Microsoft.ML.Transforms.TimeSeries; -namespace Microsoft.MachineLearning.DotNetBridge +namespace Microsoft.ML.DotNetBridge { /// /// The main entry point from native code. Note that GC / lifetime issues are critical to get correct. @@ -302,6 +300,7 @@ private static unsafe int GenericExec(EnvironmentBlock* penv, sbyte* psz, int cd //env.ComponentCatalog.RegisterAssembly(typeof(TimeSeriesProcessingEntryPoints).Assembly); //env.ComponentCatalog.RegisterAssembly(typeof(ParquetLoader).Assembly); env.ComponentCatalog.RegisterAssembly(typeof(SsaChangePointDetector).Assembly); + env.ComponentCatalog.RegisterAssembly(typeof(DotNetBridgeEntrypoints).Assembly); using (var ch = host.Start("Executing")) { diff --git a/src/DotNetBridge/Entrypoints.cs b/src/DotNetBridge/Entrypoints.cs new file mode 100644 index 00000000..f17c3402 --- /dev/null +++ b/src/DotNetBridge/Entrypoints.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.ML; +using Microsoft.ML.DotNetBridge; +using Microsoft.ML.Data; +using Microsoft.ML.EntryPoints; +using Microsoft.ML.Runtime; +using Microsoft.ML.Transforms; + +[assembly: LoadableClass(typeof(void), typeof(DotNetBridgeEntrypoints), null, typeof(SignatureEntryPointModule), "DotNetBridgeEntrypoints")] + +namespace Microsoft.ML.DotNetBridge +{ + internal static class DotNetBridgeEntrypoints + { + [TlcModule.EntryPoint(Name = "Transforms.PrefixColumnConcatenator", Desc = ColumnConcatenatingTransformer.Summary, + UserName = ColumnConcatenatingTransformer.UserName, ShortName = ColumnConcatenatingTransformer.LoadName)] + public static CommonOutputs.TransformOutput ConcatColumns(IHostEnvironment env, ColumnCopyingTransformer.Options input) + { + Contracts.CheckValue(env, nameof(env)); + var host = env.Register("PrefixConcatColumns"); + host.CheckValue(input, nameof(input)); + EntryPointUtils.CheckInputArgs(host, input); + + // Get all column names with preserving order. + var colNames = new List(input.Data.Schema.Count); + for (int i = 0; i < input.Data.Schema.Count; i++) + colNames.Add(input.Data.Schema[i].Name); + + // Iterate throuh input options, find matching source columns, create new input options + var inputOptions = new ColumnConcatenatingTransformer.Options() { Data = input.Data }; + var columns = new List(input.Columns.Length); + foreach (var col in input.Columns) + { + var newCol = new ColumnConcatenatingTransformer.Column(); + newCol.Name = col.Name; + var prefix = col.Source; + newCol.Source = colNames.Where(x => x.StartsWith(prefix, StringComparison.InvariantCulture)).ToArray(); + if (newCol.Source.Length == 0) + throw new ArgumentOutOfRangeException("No matching columns found for prefix: " + prefix); + + columns.Add(newCol); + } + inputOptions.Columns = columns.ToArray(); + + var xf = ColumnConcatenatingTransformer.Create(env, inputOptions, inputOptions.Data); + return new CommonOutputs.TransformOutput { Model = new TransformModelImpl(env, xf, inputOptions.Data), OutputData = xf }; + } + } +} diff --git a/src/DotNetBridge/MessageValidator.cs b/src/DotNetBridge/MessageValidator.cs index 2aa78c27..4243a45d 100644 --- a/src/DotNetBridge/MessageValidator.cs +++ b/src/DotNetBridge/MessageValidator.cs @@ -7,7 +7,7 @@ using System.Globalization; using Microsoft.ML.Runtime; -namespace Microsoft.MachineLearning.DotNetBridge +namespace Microsoft.ML.DotNetBridge { /// /// This is a temporary solution to validate the messages from ML.NET to nimbusml. diff --git a/src/DotNetBridge/NativeDataInterop.cs b/src/DotNetBridge/NativeDataInterop.cs index aca91038..8d1b7a8c 100644 --- a/src/DotNetBridge/NativeDataInterop.cs +++ b/src/DotNetBridge/NativeDataInterop.cs @@ -9,11 +9,10 @@ using System.Globalization; using System.Runtime.InteropServices; using System.Text; -using Microsoft.ML; using Microsoft.ML.Data; using Microsoft.ML.Runtime; -namespace Microsoft.MachineLearning.DotNetBridge +namespace Microsoft.ML.DotNetBridge { public unsafe static partial class Bridge { diff --git a/src/DotNetBridge/NativeDataView.cs b/src/DotNetBridge/NativeDataView.cs index 09796203..7576781c 100644 --- a/src/DotNetBridge/NativeDataView.cs +++ b/src/DotNetBridge/NativeDataView.cs @@ -8,13 +8,12 @@ using System.Collections.Concurrent; using System.Linq; using System.Threading; -using Microsoft.ML; using Microsoft.ML.Data; using Microsoft.ML.Internal.Utilities; using System.Threading.Tasks; using Microsoft.ML.Runtime; -namespace Microsoft.MachineLearning.DotNetBridge +namespace Microsoft.ML.DotNetBridge { public unsafe static partial class Bridge { diff --git a/src/DotNetBridge/RmlEnvironment.cs b/src/DotNetBridge/RmlEnvironment.cs index 36f44240..dc9ff045 100644 --- a/src/DotNetBridge/RmlEnvironment.cs +++ b/src/DotNetBridge/RmlEnvironment.cs @@ -5,10 +5,9 @@ using System; using System.Globalization; -using Microsoft.ML; using Microsoft.ML.Runtime; -namespace Microsoft.MachineLearning.DotNetBridge +namespace Microsoft.ML.DotNetBridge { internal class RmlEnvironment : HostEnvironmentBase { diff --git a/src/DotNetBridge/RunGraph.cs b/src/DotNetBridge/RunGraph.cs index 932a8aa6..d97c3249 100644 --- a/src/DotNetBridge/RunGraph.cs +++ b/src/DotNetBridge/RunGraph.cs @@ -9,8 +9,6 @@ using System.IO; using System.Linq; using Microsoft.DataPrep.Common; -using Microsoft.ML; -using Microsoft.ML.CommandLine; using Microsoft.ML.Data; using Microsoft.ML.Data.IO; using Microsoft.ML.EntryPoints; @@ -20,7 +18,7 @@ using Newtonsoft.Json; using Newtonsoft.Json.Linq; -namespace Microsoft.MachineLearning.DotNetBridge +namespace Microsoft.ML.DotNetBridge { public unsafe static partial class Bridge { diff --git a/src/NativeBridge/UnixInterface.h b/src/NativeBridge/UnixInterface.h index f5e88099..bb2c7fd5 100644 --- a/src/NativeBridge/UnixInterface.h +++ b/src/NativeBridge/UnixInterface.h @@ -24,7 +24,7 @@ #define CORECLR_SHUTDOWN "coreclr_shutdown" #define DOTNETBRIDGE "DotNetBridge" -#define DOTNETBRIDGE_FQDN "Microsoft.MachineLearning.DotNetBridge.Bridge" +#define DOTNETBRIDGE_FQDN "Microsoft.ML.DotNetBridge.Bridge" #define GET_FN "GetFn" diff --git a/src/NativeBridge/WinInterface.h b/src/NativeBridge/WinInterface.h index 4f5238db..3548b578 100644 --- a/src/NativeBridge/WinInterface.h +++ b/src/NativeBridge/WinInterface.h @@ -302,7 +302,7 @@ class WinMlNetInterface HRESULT hr = host->CreateDelegate( _domainId, W("DotNetBridge"), - W("Microsoft.MachineLearning.DotNetBridge.Bridge"), + W("Microsoft.ML.DotNetBridge.Bridge"), W("GetFn"), &getter); if (FAILED(hr)) diff --git a/src/python/nimbusml.pyproj b/src/python/nimbusml.pyproj index 7bd2bf28..49d2dfae 100644 --- a/src/python/nimbusml.pyproj +++ b/src/python/nimbusml.pyproj @@ -91,6 +91,8 @@ + + @@ -299,6 +301,7 @@ + @@ -386,6 +389,7 @@ + @@ -633,6 +637,7 @@ + @@ -669,10 +674,12 @@ + + diff --git a/src/python/nimbusml/examples/PrefixColumnConcatenator.py b/src/python/nimbusml/examples/PrefixColumnConcatenator.py new file mode 100644 index 00000000..b11ddb02 --- /dev/null +++ b/src/python/nimbusml/examples/PrefixColumnConcatenator.py @@ -0,0 +1,25 @@ +############################################################################### +# PrefixColumnConcatenator +import numpy as np +import pandas as pd +from nimbusml.preprocessing.schema import PrefixColumnConcatenator + +data = pd.DataFrame( + data=dict( + PrefixA=[2.5, np.nan, 2.1, 1.0], + PrefixB=[.75, .9, .8, .76], + AnotherColumn=[np.nan, 2.5, 2.6, 2.4])) + +# transform usage +xf = PrefixColumnConcatenator(columns={'combined': 'Prefix'}) + +# fit and transform +features = xf.fit_transform(data) + +# print features +print(features.head()) +# PrefixA PrefixB AnotherColumn combined.PrefixA combined.PrefixB +#0 2.5 0.75 NaN 2.5 0.75 +#1 NaN 0.90 2.5 NaN 0.90 +#2 2.1 0.80 2.6 2.1 0.80 +#3 1.0 0.76 2.4 1.0 0.76 \ No newline at end of file diff --git a/src/python/nimbusml/examples/examples_from_dataframe/PrefixColumnConcatenator_df.py b/src/python/nimbusml/examples/examples_from_dataframe/PrefixColumnConcatenator_df.py new file mode 100644 index 00000000..022e014a --- /dev/null +++ b/src/python/nimbusml/examples/examples_from_dataframe/PrefixColumnConcatenator_df.py @@ -0,0 +1,31 @@ +############################################################################### +# PrefixColumnConcatenator +import numpy as np +import pandas as pd +from nimbusml import Pipeline, Role +from nimbusml.datasets import get_dataset +from nimbusml.linear_model import LogisticRegressionClassifier +from nimbusml.preprocessing.schema import PrefixColumnConcatenator +from nimbusml.preprocessing.schema import ColumnDropper +from sklearn.model_selection import train_test_split + +# use 'iris' data set to create test and train data +# Sepal_Length Sepal_Width Petal_Length Petal_Width Label Species Setosa +# 0 5.1 3.5 1.4 0.2 0 setosa 1.0 +# 1 4.9 3.0 1.4 0.2 0 setosa 1.0 +df = get_dataset("iris").as_df() + +X_train, X_test, y_train, y_test = \ + train_test_split(df.loc[:, df.columns != 'Label'], df['Label']) + +concat = PrefixColumnConcatenator() << {'Sepal': 'Sepal_'} +concat1 = PrefixColumnConcatenator() << {'Petal': 'Petal_'} +dropcols = ColumnDropper() << ['Sepal_Length', 'Sepal_Width', 'Petal_Length', + 'Petal_Width', 'Setosa', 'Species'] + +pipeline = Pipeline([concat, concat1, dropcols, LogisticRegressionClassifier()]) +pipeline.fit(X_train, y_train) + +# Evaluate the model +metrics, scores = pipeline.test(X_test, y_test, output_scores=True) +print(metrics) diff --git a/src/python/nimbusml/internal/core/preprocessing/schema/prefixcolumnconcatenator.py b/src/python/nimbusml/internal/core/preprocessing/schema/prefixcolumnconcatenator.py new file mode 100644 index 00000000..d202e947 --- /dev/null +++ b/src/python/nimbusml/internal/core/preprocessing/schema/prefixcolumnconcatenator.py @@ -0,0 +1,100 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------------------------- +""" +PrefixColumnConcatenator +""" + +__all__ = ["PrefixColumnConcatenator"] + + +from ....entrypoints.transforms_prefixcolumnconcatenator import \ + transforms_prefixcolumnconcatenator +from ....utils.utils import trace +from ...base_pipeline_item import BasePipelineItem, DefaultSignature + + +class PrefixColumnConcatenator(BasePipelineItem, DefaultSignature): + """ + + Combines several columns into a single vector-valued column by prefix + + .. remarks:: + ``PrefixColumnConcatenator`` creates a single vector-valued column from + multiple + columns. It can be performed on data before training a model. The + concatenation + can significantly speed up the processing of data when the number of + columns + is as large as hundreds to thousands. + + :param params: Additional arguments sent to compute engine. + + .. seealso:: + :py:class:`ColumnDropper + `, + :py:class:`ColumnSelector + `. + + .. index:: transform, schema + + Example: + .. literalinclude:: /../nimbusml/examples/PrefixColumnConcatenator.py + :language: python + """ + + @trace + def __init__( + self, + **params): + BasePipelineItem.__init__( + self, type='transform', **params) + + @property + def _entrypoint(self): + return transforms_prefixcolumnconcatenator + + @trace + def _get_node(self, **all_args): + + input_columns = self.input + if input_columns is None and 'input' in all_args: + input_columns = all_args['input'] + if 'input' in all_args: + all_args.pop('input') + + output_columns = self.output + if output_columns is None and 'output' in all_args: + output_columns = all_args['output'] + if 'output' in all_args: + all_args.pop('output') + + # validate input + if input_columns is None: + raise ValueError( + "'None' input passed when it cannot be none.") + + if not isinstance(input_columns, list): + raise ValueError( + "input has to be a list of strings, instead got %s" % + type(input_columns)) + + # validate output + if output_columns is None: + raise ValueError( + "'None' output passed when it cannot be none.") + + if not isinstance(output_columns, list): + raise ValueError( + "output has to be a list of strings, instead got %s" % + type(output_columns)) + + algo_args = dict( + column=[ + dict( + Source=i, Name=o) for i, o in zip( + input_columns, output_columns)] if input_columns else None) + + all_args.update(algo_args) + return self._entrypoint(**all_args) diff --git a/src/python/nimbusml/internal/entrypoints/transforms_prefixcolumnconcatenator.py b/src/python/nimbusml/internal/entrypoints/transforms_prefixcolumnconcatenator.py new file mode 100644 index 00000000..cfe672b7 --- /dev/null +++ b/src/python/nimbusml/internal/entrypoints/transforms_prefixcolumnconcatenator.py @@ -0,0 +1,64 @@ +""" +Transforms.PrefixColumnConcatenator +""" + + +from ..utils.entrypoints import EntryPoint +from ..utils.utils import try_set, unlist + + +def transforms_prefixcolumnconcatenator( + column, + data, + output_data=None, + model=None, + **params): + """ + **Description** + Concatenates one or more columns of the same item type by prefix. + + :param column: New column definition(s) (optional form: + name:srcs) (inputs). + :param data: Input dataset (inputs). + :param output_data: Transformed dataset (outputs). + :param model: Transform model (outputs). + """ + + entrypoint_name = 'Transforms.PrefixColumnConcatenator' + inputs = {} + outputs = {} + + if column is not None: + inputs['Column'] = try_set( + obj=column, + none_acceptable=False, + is_of_type=list, + is_column=True) + if data is not None: + inputs['Data'] = try_set( + obj=data, + none_acceptable=False, + is_of_type=str) + if output_data is not None: + outputs['OutputData'] = try_set( + obj=output_data, + none_acceptable=False, + is_of_type=str) + if model is not None: + outputs['Model'] = try_set( + obj=model, + none_acceptable=False, + is_of_type=str) + + input_variables = { + x for x in unlist(inputs.values()) + if isinstance(x, str) and x.startswith("$")} + output_variables = { + x for x in unlist(outputs.values()) + if isinstance(x, str) and x.startswith("$")} + + entrypoint = EntryPoint( + name=entrypoint_name, inputs=inputs, outputs=outputs, + input_variables=input_variables, + output_variables=output_variables) + return entrypoint diff --git a/src/python/nimbusml/internal/utils/data_schema.py b/src/python/nimbusml/internal/utils/data_schema.py index 0fb409e1..8cd0ca1d 100644 --- a/src/python/nimbusml/internal/utils/data_schema.py +++ b/src/python/nimbusml/internal/utils/data_schema.py @@ -655,7 +655,7 @@ def handle_file(filename): graph = Graph(*(graph_nodes), inputs=dict(file=filename), outputs=dict(data='')) st = FileDataStream(filename, schema=None) - (out_model, out_data, out_metrics) = graph.run(verbose=True, + (out_model, out_data, out_metrics, _) = graph.run(verbose=True, X=st) if isinstance(filepath_or_buffer, StringIO): diff --git a/src/python/nimbusml/internal/utils/data_stream.py b/src/python/nimbusml/internal/utils/data_stream.py index 675ba148..128d71b1 100644 --- a/src/python/nimbusml/internal/utils/data_stream.py +++ b/src/python/nimbusml/internal/utils/data_stream.py @@ -428,7 +428,7 @@ def to_df(self): dict( data=''), dict( output_data=''), DataOutputFormat.DF, *(graph_nodes)) - (out_model, out_data, out_metrics) = graph.run(verbose=True, X=self) + (out_model, out_data, out_metrics, _) = graph.run(verbose=True, X=self) return out_data def head(self, n=5, skip=0): @@ -457,7 +457,7 @@ def head(self, n=5, skip=0): dict( data=''), dict( output_data=''), DataOutputFormat.DF, *(graph_nodes)) - (out_model, out_data, out_metrics) = graph.run(verbose=True, X=self) + (out_model, out_data, out_metrics, _) = graph.run(verbose=True, X=self) return out_data def clone(self): diff --git a/src/python/nimbusml/internal/utils/entrypoints.py b/src/python/nimbusml/internal/utils/entrypoints.py index 28127b1d..cc290e4b 100644 --- a/src/python/nimbusml/internal/utils/entrypoints.py +++ b/src/python/nimbusml/internal/utils/entrypoints.py @@ -269,7 +269,8 @@ def _try_call_bridge( call_parameters, verbose, concatenated, - output_modelfilename): + output_modelfilename, + output_predictor_modelfilename=None): try: ret = px_call(call_parameters) except RuntimeError as e: @@ -320,6 +321,7 @@ def run(self, X, y=None, max_slots=-1, random_state=None, verbose=1, **params): return 'graph = %s' % (str(self)) output_modelfilename = None + output_predictor_modelfilename = None output_metricsfilename = None out_metrics = None @@ -391,6 +393,11 @@ def remove_multi_level_index(c): output_modelfilename = _get_temp_file(suffix='.model.bin') self.outputs['output_model'] = output_modelfilename + # set graph output model to temp file + if 'output_predictor_model' in self.outputs: + output_predictor_modelfilename = _get_temp_file(suffix='.predictor.model.bin') + self.outputs['output_predictor_model'] = output_predictor_modelfilename + # set graph output metrics to temp file if 'output_metrics' in self.outputs: output_metricsfilename = _get_temp_file(suffix='.txt') @@ -435,7 +442,8 @@ def remove_multi_level_index(c): call_parameters, verbose, concatenated, - output_modelfilename) + output_modelfilename, + output_predictor_modelfilename) out_data = None @@ -466,16 +474,13 @@ def remove_multi_level_index(c): return self._process_graph_run_results(out_data) elif self._data_output_format == DataOutputFormat.IDV: output = BinaryDataStream(output_idvfilename) - return (output_modelfilename, output, out_metrics) + return (output_modelfilename, output, out_metrics, output_predictor_modelfilename) else: - return (output_modelfilename, out_data, out_metrics) + return (output_modelfilename, out_data, out_metrics, output_predictor_modelfilename) finally: if cv: self._remove_temp_files() else: - if output_modelfilename: - # os.remove(output_modelfilename) - pass if output_metricsfilename: os.remove(output_metricsfilename) diff --git a/src/python/nimbusml/pipeline.py b/src/python/nimbusml/pipeline.py index 1cbd0674..a19a0188 100644 --- a/src/python/nimbusml/pipeline.py +++ b/src/python/nimbusml/pipeline.py @@ -655,8 +655,7 @@ def _update_graph_nodes_for_learner( else: raise NotImplementedError( "Strategy '{0}' to handle unspecified inputs is not " - "implemented".format( - strategy_iosklearn)) + "implemented".format(strategy_iosklearn)) if label_column is not None or last_node._use_role(Role.Label): if getattr(last_node, 'label_column_name_', None): @@ -679,8 +678,7 @@ def _update_graph_nodes_for_learner( last_node.label_column_name = None label_column = None - if weight_column is not None or last_node._use_role( - Role.Weight): + if weight_column is not None or last_node._use_role(Role.Weight): if getattr(last_node, 'example_weight_column_name', None): weight_column = last_node.example_weight_column_name elif weight_column: @@ -692,8 +690,7 @@ def _update_graph_nodes_for_learner( if (hasattr(last_node, 'row_group_column_name_') and last_node.row_group_column_name_ is not None): group_id_column = last_node.row_group_column_name_ - elif (hasattr(last_node, - 'row_group_column_name') and + elif (hasattr(last_node, 'row_group_column_name') and last_node.row_group_column_name is not None): group_id_column = last_node.row_group_column_name else: @@ -715,10 +712,8 @@ def _update_graph_nodes_for_learner( # todo: ideally all the nodes have the same name for params # so we dont have to distinguish if its learner or - # transformer. We will supply - # input_data, output_data & output_model vars. Its up to - # node to - # use suplied vars + # transformer. We will supply input_data, output_data and + # output_model vars. Its up to node to use suplied vars. learner_node = last_node._get_node( feature_column_name=learner_features, training_data=output_data, @@ -745,6 +740,7 @@ def _fit_graph(self, X, y, verbose, **params): output_binary_data_stream = params.pop( 'output_binary_data_stream', False) params.pop('parallel', None) + do_output_predictor_model = params.pop('output_predictor_model', None) X, y, columns_renamed, feature_columns, label_column, schema, \ weights, weight_column = self._preprocess_X_y(X, y, weights) @@ -759,6 +755,7 @@ def _fit_graph(self, X, y, verbose, **params): input_data = "$input_data" output_data = "$output_data" output_model = "$output_model" + output_predictor_model = "$output_predictor_model" predictor_model = "$predictor_model" graph_nodes, feature_columns, inputs, transform_nodes, \ @@ -773,10 +770,13 @@ def _fit_graph(self, X, y, verbose, **params): self._update_graph_nodes_for_learner( graph_nodes, transform_nodes, - columns_out, label_column, + columns_out, + label_column, weight_column, - output_data, output_model, - predictor_model, y, + output_data, + output_model, + predictor_model, + y, strategy_iosklearn=strategy_iosklearn) # graph_nodes contain graph sections, which is needed for CV. @@ -792,17 +792,27 @@ def _fit_graph(self, X, y, verbose, **params): transform_models.append(node.inputs['TransformModel']) elif "Model" in node.outputs: transform_models.append(node.outputs["Model"]) - - if learner_node and len( - transform_models) > 0: # no need to combine if there is - # only 1 model + # no need to combine if there is only 1 model + if learner_node and len(transform_models) > 0: combine_model_node = transforms_manyheterogeneousmodelcombiner( transform_models=transform_models, - predictor_model=( - predictor_model if learner_node else None), + predictor_model=predictor_model, model=output_model) combine_model_node._implicit = True graph_nodes.append(combine_model_node) + if do_output_predictor_model: + # get implicit_nodes and build predictor model only + implicit_nodes = graph_sections['implicit_nodes'] + implicit_transform_models = [] + for node in implicit_nodes: + if "Model" in node.outputs: + implicit_transform_models.append(node.outputs["Model"]) + output_predictor_model_node = transforms_manyheterogeneousmodelcombiner( + transform_models=implicit_transform_models, + predictor_model=predictor_model, + model=output_predictor_model) + output_predictor_model_node._implicit = True + graph_nodes.append(output_predictor_model_node) elif len(transform_models) > 1: combine_model_node = transforms_modelcombiner( models=transform_models, @@ -811,12 +821,13 @@ def _fit_graph(self, X, y, verbose, **params): graph_nodes.append(combine_model_node) elif len(graph_nodes) == 0: raise RuntimeError( - "Unable to process the pipeline len(transform_models)={" - "0}.".format( - len(transform_models))) + "Unable to process the pipeline len(transform_models)={0}.". + format(len(transform_models))) # create the graph outputs = OrderedDict([(output_model.replace('$', ''), '')]) + if do_output_predictor_model: + outputs[output_predictor_model.replace('$', '')] = '' # REVIEW: ideally we should remove output completely from the # graph if its not needed # however graph validation logic prevents doing that at the moment, @@ -1154,10 +1165,9 @@ def move_information_about_roles_once_used(): # run the graph # REVIEW: we should have the possibility to keep the model in - # memory - # and not in a file. + # memory and not in a file. try: - (out_model, out_data, out_metrics) = graph.run( + (out_model, out_data, out_metrics, out_predictor_model) = graph.run( X=X, y=y, random_state=self.random_state, @@ -1183,6 +1193,8 @@ def move_information_about_roles_once_used(): move_information_about_roles_once_used() self.graph_ = graph self.model = out_model + if out_predictor_model: + self.predictor_model = out_predictor_model self.data = out_data # stop the clock self._run_time = time.time() - start_time @@ -1394,9 +1406,7 @@ def _process_learner( optional_node = transforms_optionalcolumncreator( column=[label], data="$input_data" if num_transforms == 0 else - output_data + - str( - num_transforms), + output_data + str(num_transforms), output_data="$optional_data", model=output_model + str(num_transforms + 1)) optional_node._implicit = True @@ -1404,24 +1414,20 @@ def _process_learner( data="$optional_data", label_column=label, output_data="$label_data", - model=output_model + str( - num_transforms + 2)) + model=output_model + str(num_transforms + 2)) label_node._implicit = True feature_node = transforms_featurecombiner( data="$label_data", features=features, output_data=output_data, - model=output_model + str( - num_transforms + 3)) + model=output_model + str(num_transforms + 3)) feature_node._implicit = True implicit_nodes = [optional_node, label_node, feature_node] elif learner.type in ('classifier', 'ranker'): optional_node = transforms_optionalcolumncreator( column=[label], data="$input_data" if num_transforms == 0 else - output_data + - str( - num_transforms), + output_data + str(num_transforms), output_data="$optional_data", model=output_model + str(num_transforms + 1)) optional_node._implicit = True @@ -1432,25 +1438,20 @@ def _process_learner( text_key_values=False, model=output_model + str(num_transforms + 2)) label_node._implicit = True - feature_node = transforms_featurecombiner( data="$label_data", features=features, output_data=output_data, - model=output_model + str( - num_transforms + 3)) + model=output_model + str(num_transforms + 3)) feature_node._implicit = True implicit_nodes = [optional_node, label_node, feature_node] elif learner.type in {'recommender', 'sequence'}: - raise NotImplementedError( - "Type '{0}' is not implemented yet.".format( - learner.type)) + raise NotImplementedError("Type '{0}' is not implemented yet.". + format(learner.type)) else: feature_node = transforms_featurecombiner( data="$input_data" if num_transforms == 0 else - output_data + - str( - num_transforms), + output_data + str(num_transforms), features=features, output_data=output_data, model=output_model + str(num_transforms + 1)) @@ -1796,7 +1797,7 @@ def get_feature_contributions(self, X, top=10, bottom=10, verbose=0, telemetry_info = ".".join([class_name, method_name]) try: - (out_model, out_data, out_metrics) = graph.run( + (out_model, out_data, out_metrics, _) = graph.run( X=X, random_state=self.random_state, model=self.model, @@ -1928,7 +1929,7 @@ def _predict(self, X, y=None, telemetry_info = ".".join([class_name, method_name]) try: - (out_model, out_data, out_metrics) = graph.run( + (out_model, out_data, out_metrics, _) = graph.run( X=X, y=y, random_state=self.random_state, @@ -2273,7 +2274,7 @@ def transform( max_slots = params.pop('max_slots', -1) try: - (out_model, out_data, out_metrics) = graph.run( + (out_model, out_data, out_metrics, _) = graph.run( X=X, random_state=self.random_state, model=self.model, @@ -2344,7 +2345,7 @@ def summary(self, verbose=0, **params): telemetry_info = ".".join([class_name, method_name]) try: - (_, summary_data, _) = graph.run( + (_, summary_data, _, _) = graph.run( X=None, y=None, random_state=self.random_state, @@ -2594,7 +2595,7 @@ def combine_models(cls, *items, **params): telemetry_info = ".".join([class_name, method_name]) try: - (out_model, _, _) = graph.run( + (out_model, _, _, _) = graph.run( X=None, y=None, random_state=None, diff --git a/src/python/nimbusml/preprocessing/schema/__init__.py b/src/python/nimbusml/preprocessing/schema/__init__.py index 9f8ecfe4..51380e72 100644 --- a/src/python/nimbusml/preprocessing/schema/__init__.py +++ b/src/python/nimbusml/preprocessing/schema/__init__.py @@ -2,6 +2,7 @@ from .columndropper import ColumnDropper from .columnduplicator import ColumnDuplicator from .columnselector import ColumnSelector +from .prefixcolumnconcatenator import PrefixColumnConcatenator from .typeconverter import TypeConverter __all__ = [ @@ -9,5 +10,6 @@ 'ColumnDropper', 'ColumnDuplicator', 'ColumnSelector', + 'PrefixColumnConcatenator', 'TypeConverter' ] diff --git a/src/python/nimbusml/preprocessing/schema/prefixcolumnconcatenator.py b/src/python/nimbusml/preprocessing/schema/prefixcolumnconcatenator.py new file mode 100644 index 00000000..9a3aa443 --- /dev/null +++ b/src/python/nimbusml/preprocessing/schema/prefixcolumnconcatenator.py @@ -0,0 +1,86 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------------------------- +""" +PrefixColumnConcatenator +""" + +__all__ = ["PrefixColumnConcatenator"] + + +from sklearn.base import TransformerMixin + +from ...base_transform import BaseTransform +from ...internal.core.preprocessing.schema.prefixcolumnconcatenator import \ + PrefixColumnConcatenator as core +from ...internal.utils.utils import trace + + +class PrefixColumnConcatenator(core, BaseTransform, TransformerMixin): + """ + + Combines several columns into a single vector-valued column by prefix. + + .. remarks:: + ``PrefixColumnConcatenator`` creates a single vector-valued column from + multiple + columns. It can be performed on data before training a model. The + concatenation + can significantly speed up the processing of data when the number of + columns + is as large as hundreds to thousands. + + :param columns: a dictionary of key-value pairs, where key is the output + column name and value is a list of input column names. + + * Only one key-value pair is allowed. + * Input column type: numeric or string. + * Output column type: + `Vector Type `_. + + The << operator can be used to set this value (see + `Column Operator `_) + + For example + * ColumnConcatenator(columns={'features': ['age', 'parity', + 'induced']}) + * ColumnConcatenator() << {'features': ['age', 'parity', + 'induced']}) + + For more details see `Columns `_. + + :param params: Additional arguments sent to compute engine. + + .. seealso:: + :py:class:`ColumnDropper + `, + :py:class:`ColumnSelector + `. + + .. index:: transform, schema + + Example: + .. literalinclude:: /../nimbusml/examples/PrefixColumnConcatenator.py + :language: python + """ + + @trace + def __init__( + self, + columns=None, + **params): + + if columns: + params['columns'] = columns + BaseTransform.__init__(self, **params) + core.__init__( + self, + **params) + self._columns = columns + + def get_params(self, deep=False): + """ + Get the parameters for this operator. + """ + return core.get_params(self) diff --git a/src/python/nimbusml/tests/pipeline/test_pipeline_split_models.py b/src/python/nimbusml/tests/pipeline/test_pipeline_split_models.py new file mode 100644 index 00000000..bc1399bf --- /dev/null +++ b/src/python/nimbusml/tests/pipeline/test_pipeline_split_models.py @@ -0,0 +1,172 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------------------------- +import os +import unittest + +import numpy as np +import pandas as pd +from nimbusml import Pipeline, FileDataStream +from nimbusml.datasets import get_dataset +from nimbusml.feature_extraction.categorical import OneHotVectorizer +from nimbusml.linear_model import LogisticRegressionBinaryClassifier, OnlineGradientDescentRegressor +from nimbusml.preprocessing.filter import RangeFilter +from nimbusml.preprocessing.schema import ColumnConcatenator, PrefixColumnConcatenator + +seed = 0 + +train_data = {'c0': ['a', 'b', 'a', 'b'], + 'c1': [1, 2, 3, 4], + 'c2': [2, 3, 4, 5]} +train_df = pd.DataFrame(train_data).astype({'c1': np.float64, + 'c2': np.float64}) + +test_data = {'c0': ['a', 'b', 'b'], + 'c1': [1.5, 2.3, 3.7], + 'c2': [2.2, 4.9, 2.7]} +test_df = pd.DataFrame(test_data).astype({'c1': np.float64, + 'c2': np.float64}) + + +class TestPipelineSplitModels(unittest.TestCase): + + def test_notvectorized_output_predictor_model(self): + """ + This test verifies that outputted predictor model from + combined (with featurizers) pipeline runs successfully + on featurized data with no vectors. + """ + df = train_df.drop(['c0'], axis=1) + + # Create and fit a RangeFilter transform using the training + # data and use it to transform the training data. + transform_pipeline = Pipeline([RangeFilter(min=0.0, max=4.5) << 'c2'], random_state=seed) + transform_pipeline.fit(df) + df1 = transform_pipeline.transform(df) + + # Create and fit a combined model and spit out predictor model + combined_pipeline = Pipeline([RangeFilter(min=0.0, max=4.5) << 'c2', + OnlineGradientDescentRegressor(label='c2')], + random_state=seed) + combined_pipeline.fit(df, output_predictor_model=True) + result_1 = combined_pipeline.predict(df) + + # Load predictor pipeline and score featurized data + predictor_pipeline = Pipeline() + predictor_pipeline.load_model(combined_pipeline.predictor_model) + result_2 = predictor_pipeline.predict(df1) + + self.assertEqual(result_1.loc[0, 'Score'], result_2.loc[0, 'Score']) + self.assertEqual(result_1.loc[1, 'Score'], result_2.loc[1, 'Score']) + + def test_vectorized_output_predictor_model(self): + """ + This test shows that outputted predictor model from + combined (with featurizers) pipeline fails to run + on featurized data with vectors. + """ + + # Create and fit a OneHotVectorizer transform using the + # training data and use it to transform the training data. + transform_pipeline = Pipeline([OneHotVectorizer() << 'c0'], random_state=seed) + transform_pipeline.fit(train_df) + df = transform_pipeline.transform(train_df) + + # Create and fit a combined model and spit out predictor model + combined_pipeline = Pipeline([OneHotVectorizer() << 'c0', + OnlineGradientDescentRegressor(label='c2')], + random_state=seed) + combined_pipeline.fit(train_df, output_predictor_model=True) + result_1 = combined_pipeline.predict(train_df) + + # Load predictor pipeline and score featurized data + predictor_pipeline = Pipeline() + predictor_pipeline.load_model(combined_pipeline.predictor_model) + + try: + # This does not work because the input schema doesnt + # match. Input schema looks for vector 'c0' with slots 'a,b' + # but featurized data has only columns 'c0.a' and 'c0.b' + predictor_pipeline.predict(df) + + except Exception as e: + pass + else: + self.fail() + + def test_vectorized_with_concat_output_predictor_model(self): + """ + This test shows how to prepend ColumnConcatenator transform + to outputted predictor model from combined (with featurizers) pipeline + so it successfully runs on featurized data with vectors. + """ + # Create and fit a OneHotVectorizer transform using the + # training data and use it to transform the training data. + transform_pipeline = Pipeline([OneHotVectorizer() << 'c0'], random_state=seed) + transform_pipeline.fit(train_df) + df = transform_pipeline.transform(train_df) + + # Create, fit and score with combined model. + # Output predictor model separately. + combined_pipeline = Pipeline([OneHotVectorizer() << 'c0', + OnlineGradientDescentRegressor(label='c2')], + random_state=seed) + combined_pipeline.fit(train_df, output_predictor_model=True) + result_1 = combined_pipeline.predict(train_df) + + # train ColumnConcatenator on featurized data + concat_pipeline = Pipeline([ColumnConcatenator(columns={'c0': ['c0.a', 'c0.b']})]) + concat_pipeline.fit(df) + + # Load predictor pipeline + predictor_pipeline = Pipeline() + predictor_pipeline.load_model(combined_pipeline.predictor_model) + + # combine concat and predictor models and score + combined_predictor_pipeline = Pipeline.combine_models(concat_pipeline, + predictor_pipeline) + result_2 = combined_predictor_pipeline.predict(df) + + self.assertEqual(result_1.loc[0, 'Score'], result_2.loc[0, 'Score']) + self.assertEqual(result_1.loc[1, 'Score'], result_2.loc[1, 'Score']) + + def test_vectorized_with_prefixconcat_output_predictor_model(self): + """ + This test shows how to prepend ColumnConcatenator transform + to outputted predictor model from combined (with featurizers) pipeline + so it successfully runs on featurized data with vectors. + """ + # Create and fit a OneHotVectorizer transform using the + # training data and use it to transform the training data. + transform_pipeline = Pipeline([OneHotVectorizer() << 'c0'], random_state=seed) + transform_pipeline.fit(train_df) + df = transform_pipeline.transform(train_df) + + # Create, fit and score with combined model. + # Output predictor model separately. + combined_pipeline = Pipeline([OneHotVectorizer() << 'c0', + OnlineGradientDescentRegressor(label='c2')], + random_state=seed) + combined_pipeline.fit(train_df, output_predictor_model=True) + result_1 = combined_pipeline.predict(train_df) + + # train ColumnConcatenator on featurized data + concat_pipeline = Pipeline([PrefixColumnConcatenator(columns={'c0': 'c0.'})]) + concat_pipeline.fit(df) + + # Load predictor pipeline + predictor_pipeline = Pipeline() + predictor_pipeline.load_model(combined_pipeline.predictor_model) + + # combine concat and predictor models and score + combined_predictor_pipeline = Pipeline.combine_models(concat_pipeline, + predictor_pipeline) + result_2 = combined_predictor_pipeline.predict(df) + + self.assertEqual(result_1.loc[0, 'Score'], result_2.loc[0, 'Score']) + self.assertEqual(result_1.loc[1, 'Score'], result_2.loc[1, 'Score']) + +if __name__ == '__main__': + unittest.main() + diff --git a/src/python/nimbusml/tests/preprocessing/schema/test_prefixcolumnconcatenator.py b/src/python/nimbusml/tests/preprocessing/schema/test_prefixcolumnconcatenator.py new file mode 100644 index 00000000..75471be3 --- /dev/null +++ b/src/python/nimbusml/tests/preprocessing/schema/test_prefixcolumnconcatenator.py @@ -0,0 +1,36 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------------------------- + +import unittest + +from nimbusml import FileDataStream +from nimbusml.datasets import get_dataset +from nimbusml.preprocessing.schema import PrefixColumnConcatenator + + +class TestPrefixColumnConcatenator(unittest.TestCase): + + def test_prefix_columns_concatenator(self): + data = get_dataset('iris').as_df() + xf = PrefixColumnConcatenator(columns={'Spl': 'Sepal_', 'Pet': 'Petal_' }) + features = xf.fit_transform(data) + + assert features.shape == (150, 11) + assert set(features.columns) == { + 'Sepal_Length', + 'Sepal_Width', + 'Petal_Length', + 'Petal_Width', + 'Label', + 'Species', + 'Setosa', + 'Spl.Sepal_Length', + 'Spl.Sepal_Width', + 'Pet.Petal_Length', + 'Pet.Petal_Width'} + + +if __name__ == '__main__': + unittest.main()