diff --git a/sql/codegen_xgboost.go b/sql/codegen_xgboost.go index ac9305a0e5..f54f5aa8ce 100644 --- a/sql/codegen_xgboost.go +++ b/sql/codegen_xgboost.go @@ -14,6 +14,7 @@ package sql import ( + "encoding/json" "fmt" "strconv" "strings" @@ -24,24 +25,13 @@ import ( ) type xgboostFiller struct { - isTrain bool - standardSelect string - modelPath string + modelPath string xgboostFields xgColumnFields xgDataSourceFields - xgRuntimeFields -} - -type xgRuntimeFields struct { - runLocal bool - xgRuntimeResourceFields -} - -type xgRuntimeResourceFields struct { - WorkerNum uint `json:"worker_num,omitempty"` - MemorySize uint `json:"memory_size,omitempty"` - CPUSize uint `json:"cpu_size,omitempty"` + xgboostJSON string + xgDataSourceJSON string + xgColumnJSON string } type xgboostFields struct { @@ -90,12 +80,15 @@ type xgFeatureFields struct { } type xgDataSourceFields struct { + IsTrain bool `json:"is_train,omitempty"` + StandardSelect string `json:"standard_select,omitempty"` IsTensorFlowIntegrated bool `json:"is_tf_integrated,omitempty"` X []*xgFeatureMeta `json:"x,omitempty"` LabelField *xgFeatureMeta `json:"label,omitempty"` WeightField *xgFeatureMeta `json:"weight,omitempty"` GroupField *xgFeatureMeta `json:"group,omitempty"` xgDataBaseField `json:"db_config,omitempty"` + WriteBatchSize int `json:"write_batch_size,omitempty"` } type xgDataBaseField struct { @@ -239,12 +232,7 @@ func sListPartial(key string, ptrFn func(*xgboostFiller) *[]string) func(*map[st } } -var xgbAttrSetterMap = map[string]func(*map[string][]string, *xgboostFiller) error{ - // runtime params - "run_local": boolPartial("run_local", func(r *xgboostFiller) *bool { return &(r.runLocal) }), - "workers": uIntPartial("workers", func(r *xgboostFiller) *uint { return &(r.WorkerNum) }), - "memory": uIntPartial("memory", func(r *xgboostFiller) *uint { return &(r.MemorySize) }), - "cpu": uIntPartial("cpu", func(r *xgboostFiller) *uint { return &(r.CPUSize) }), +var xgbTrainAttrSetterMap = map[string]func(*map[string][]string, *xgboostFiller) error{ // booster params "objective": strPartial("objective", func(r *xgboostFiller) *string { return &(r.Objective) }), "booster": strPartial("booster", func(r *xgboostFiller) *string { return &(r.Booster) }), @@ -262,6 +250,10 @@ var xgbAttrSetterMap = map[string]func(*map[string][]string, *xgboostFiller) err // xgboost train controllers "num_round": uIntPartial("num_round", func(r *xgboostFiller) *uint { return &(r.NumRound) }), "auto_train": boolPartial("auto_train", func(r *xgboostFiller) *bool { return &(r.AutoTrain) }), + // Label, Group, Weight and xgFeatureFields are parsed from columnClause +} + +var xgbPredAttrSetterMap = map[string]func(*map[string][]string, *xgboostFiller) error{ // xgboost output columns (for prediction) "append_columns": sListPartial("append_columns", func(r *xgboostFiller) *[]string { return &(r.AppendColumns) }), "result_column": strPartial("result_column", func(r *xgboostFiller) *string { return &(r.ResultColumn) }), @@ -272,9 +264,16 @@ var xgbAttrSetterMap = map[string]func(*map[string][]string, *xgboostFiller) err } func xgParseAttr(pr *extendedSelect, r *xgboostFiller) error { + var rawAttrs map[string]*expr + if pr.train { + rawAttrs = pr.trainAttrs + } else { + rawAttrs = pr.predAttrs + } + // parse pr.attrs to map[string][]string attrs := make(map[string][]string) - for k, exp := range pr.trainAttrs { + for k, exp := range rawAttrs { strExp := exp.String() if strings.HasPrefix(strExp, "[") && strings.HasSuffix(strExp, "]") { attrs[k] = exp.cdr() @@ -290,8 +289,14 @@ func xgParseAttr(pr *extendedSelect, r *xgboostFiller) error { } // fill xgboostFiller with attrs + var setterMap map[string]func(*map[string][]string, *xgboostFiller) error + if pr.train { + setterMap = xgbTrainAttrSetterMap + } else { + setterMap = xgbPredAttrSetterMap + } for k := range attrs { - if setter, ok := xgbAttrSetterMap[k]; ok { + if setter, ok := setterMap[k]; ok { if e := setter(&attrs, r); e != nil { return xgParseAttrError(e) } @@ -309,15 +314,14 @@ func xgParseAttr(pr *extendedSelect, r *xgboostFiller) error { return nil } -/* parse feature column, which owned by default column target("feature_columns:), from AST(pr.columns) - - For now, two schemas are supported: - 1. sparse-kv - schema: COLUMN SPARSE([feature_column], [1-dim shape], [single char delimiter]) - data example: COLUMN SPARSE("0:1.5 1:100.1f 11:-1.2", [20], " ") - 2. tf feature columns - roughly same as TFEstimator, except output shape of feaColumns are required to be 1-dim. -*/ +// parseFeatureColumns, parse feature columns from AST(pr.columns). +// Features columns are columns owned by default column target whose key is "feature_columns". +// For now, two schemas are supported: +// 1. sparse-kv +// schema: COLUMN SPARSE([feature_column], [1-dim shape], [single char delimiter]) +// data example: COLUMN SPARSE("0:1.5 1:100.1f 11:-1.2", [20], " ") +// 2. tf feature columns +// Roughly same as TFEstimator, except output shape of feaColumns are required to be 1-dim. func parseFeatureColumns(columns *exprlist, r *xgboostFiller) error { feaCols, colSpecs, err := resolveTrainColumns(columns) if err != nil { @@ -337,7 +341,7 @@ func parseFeatureColumns(columns *exprlist, r *xgboostFiller) error { return nil } -// parse sparse kv feature, which identified by `SPARSE`. +// parseSparseKeyValueFeatures, parse features which is identified by `SPARSE`. // ex: SPARSE(col1, [100], comma) func parseSparseKeyValueFeatures(colSpecs []*columnSpec, r *xgboostFiller) error { var colNames []string @@ -391,10 +395,7 @@ func parseDenseFeatures(feaCols []featureColumn, r *xgboostFiller) error { if allSimpleCol && !isSimpleColumn(col) { allSimpleCol = false } - // FIXME(typhoonzero): Use Heuristic rules to determine whether a column should be transformed to a - // tf.SparseTensor. Currently the rules are: - // if column have delimiter and it's not a sequence_catigorical_column, we'll treat it as a sparse column - // else, use dense column. + isSparse := false var isEmb bool _, ok := col.(*sequenceCategoryIDColumn) @@ -482,6 +483,9 @@ func xgParseColumns(pr *extendedSelect, filler *xgboostFiller) error { return xgParseColumnError(target, e) } case "group": + if !pr.train { + continue + } colMeta, e := parseSimpleColumn("group", &columns) if e != nil { return xgParseColumnError(target, e) @@ -489,6 +493,9 @@ func xgParseColumns(pr *extendedSelect, filler *xgboostFiller) error { filler.GroupField = colMeta filler.Group = colMeta.FeatureName case "weight": + if !pr.train { + continue + } colMeta, e := parseSimpleColumn("weight", &columns) if e != nil { return xgParseColumnError(target, e) @@ -499,10 +506,13 @@ func xgParseColumns(pr *extendedSelect, filler *xgboostFiller) error { return xgParseColumnError(target, xgUnsupportedColTagError()) } } - filler.LabelField = &xgFeatureMeta{ - FeatureName: pr.label, + // in predict mode, ignore label info + if pr.train { + filler.LabelField = &xgFeatureMeta{ + FeatureName: pr.label, + } + filler.Label = pr.label } - filler.Label = pr.label return nil } @@ -546,14 +556,14 @@ func xgParseEstimator(pr *extendedSelect, filler *xgboostFiller) error { func newXGBoostFiller(pr *extendedSelect, fts fieldTypes, db *DB) (*xgboostFiller, error) { filler := &xgboostFiller{ - isTrain: pr.train, - modelPath: pr.save, - standardSelect: pr.standardSelect.String(), + modelPath: pr.save, } + filler.IsTrain = pr.train + filler.StandardSelect = pr.standardSelect.String() // solve keyword: WITH (attributes) if e := xgParseAttr(pr, filler); e != nil { - return nil, fmt.Errorf("failed to set xgboost attributes: %exp", e) + return nil, fmt.Errorf("failed to set xgboost attributes: %v", e) } // solve keyword: TRAIN (estimator) @@ -566,7 +576,31 @@ func newXGBoostFiller(pr *extendedSelect, fts fieldTypes, db *DB) (*xgboostFille return nil, e } - return xgFillDatabaseInfo(filler, db) + // fill data base info + if _, e := xgFillDatabaseInfo(filler, db); e != nil { + return nil, e + } + + // serialize fields + jsonBuffer, e := json.Marshal(filler.xgboostFields) + if e != nil { + return nil, e + } + filler.xgboostJSON = string(jsonBuffer) + + jsonBuffer, e = json.Marshal(filler.xgDataSourceFields) + if e != nil { + return nil, e + } + filler.xgDataSourceJSON = string(jsonBuffer) + + jsonBuffer, e = json.Marshal(filler.xgColumnFields) + if e != nil { + return nil, e + } + filler.xgColumnJSON = string(jsonBuffer) + + return filler, nil } func xgFillDatabaseInfo(r *xgboostFiller, db *DB) (*xgboostFiller, error) { @@ -591,7 +625,7 @@ func xgFillDatabaseInfo(r *xgboostFiller, db *DB) (*xgboostFiller, error) { r.Host, r.Port, r.Database = sa[0], sa[1], cfg.DBName r.User, r.Password = cfg.User, cfg.Passwd // remove the last ';' which leads to a ParseException - r.standardSelect = removeLastSemicolon(r.standardSelect) + r.StandardSelect = removeLastSemicolon(r.StandardSelect) case "maxcompute": cfg, err := gomaxcompute.ParseDSN(db.dataSourceName) if err != nil { diff --git a/sql/codegen_xgboost_test.go b/sql/codegen_xgboost_test.go index 45aa29920e..6bfbf8efee 100644 --- a/sql/codegen_xgboost_test.go +++ b/sql/codegen_xgboost_test.go @@ -49,12 +49,12 @@ func TestPartials(t *testing.T) { a.Equal(filler.Objective, "reg:linear") // test uIntPartial - part = uIntPartial("workers", func(r *xgboostFiller) *uint { return &(r.WorkerNum) }) - tmpMap["workers"] = []string{"10"} + part = uIntPartial("num_class", func(r *xgboostFiller) *uint { return &(r.NumClass) }) + tmpMap["num_class"] = []string{"3"} e = part(&tmpMap, filler) a.NoError(e) - a.EqualValues(filler.WorkerNum, 10) - _, ok = tmpMap["workers"] + a.EqualValues(filler.NumClass, 3) + _, ok = tmpMap["num_class"] a.Equal(ok, false) // test fp32Partial @@ -67,25 +67,25 @@ func TestPartials(t *testing.T) { a.Equal(ok, false) // test boolPartial - part = boolPartial("run_local", func(r *xgboostFiller) *bool { return &(r.runLocal) }) - tmpMap["run_local"] = []string{"false"} + part = boolPartial("auto_train", func(r *xgboostFiller) *bool { return &(r.AutoTrain) }) + tmpMap["auto_train"] = []string{"false"} e = part(&tmpMap, filler) a.NoError(e) - a.Equal(filler.runLocal, false) - _, ok = tmpMap["run_local"] + a.Equal(filler.AutoTrain, false) + _, ok = tmpMap["auto_train"] a.Equal(ok, false) - tmpMap["run_local"] = []string{"true"} + tmpMap["auto_train"] = []string{"true"} e = part(&tmpMap, filler) a.NoError(e) - a.Equal(filler.runLocal, true) + a.Equal(filler.AutoTrain, true) // test sListPartial - part = sListPartial("app_col", func(r *xgboostFiller) *[]string { return &(r.AppendColumns) }) - tmpMap["app_col"] = []string{"AA", "BB", "CC"} + part = sListPartial("append_columns", func(r *xgboostFiller) *[]string { return &(r.AppendColumns) }) + tmpMap["append_columns"] = []string{"AA", "BB", "CC"} e = part(&tmpMap, filler) a.NoError(e) a.EqualValues(filler.AppendColumns, []string{"AA", "BB", "CC"}) - _, ok = tmpMap["app_col"] + _, ok = tmpMap["append_columns"] a.Equal(ok, false) } @@ -97,15 +97,19 @@ func TestXGBoostAttr(t *testing.T) { } parser := newParser() - filler := &xgboostFiller{} - testClause := ` + parseAndFill := func(clause string) *xgboostFiller { + filler := &xgboostFiller{} + r, e := parser.Parse(clause) + a.NoError(e) + e = xgParseAttr(r, filler) + a.NoError(e) + return filler + } + + trainClause := ` SELECT a, b, c, d, e FROM table_xx TRAIN XGBoostEstimator WITH - run_local = true, - workers = 11, - memory = 8192, - cpu = 4, objective = "binary:logistic", booster = gblinear, num_class = 2, @@ -118,20 +122,11 @@ WITH max_bin = 128, verbosity = 3, num_round = 300, - auto_train = true, - detail_column = "prediction_detail", - prob_column = "prediction_probability", - encoding_column = "prediction_leafs", - result_column = "prediction_results", - append_columns = ["AA", "BB", "CC"] + auto_train = true COLUMN a, b, c, d -LABEL e INTO model_table; +LABEL e INTO table_123; ` - r, e := parser.Parse(testClause) - a.NoError(e) - e = xgParseAttr(r, filler) - a.NoError(e) - + filler := parseAndFill(trainClause) data, e := json.Marshal(filler.xgboostFields) a.NoError(e) mapData := make(map[string]interface{}) @@ -153,26 +148,23 @@ LABEL e INTO model_table; assertEq(mapData, "num_boost_round", 300) assertEq(mapData, "auto_train", true) - data, e = json.Marshal(filler.xgColumnFields) - a.NoError(e) - mapData = make(map[string]interface{}) - e = json.Unmarshal(data, &mapData) - a.NoError(e) - ret, _ := mapData["result_columns"] - retMap, _ := ret.(map[string]interface{}) - assertEq(retMap, "result_column", "prediction_results") - assertEq(retMap, "probability_column", "prediction_probability") - assertEq(retMap, "detail_column", "prediction_detail") - assertEq(retMap, "leaf_column", "prediction_leafs") - assertEq(mapData, "append_columns", []interface{}{"AA", "BB", "CC"}) - - data, e = json.Marshal(filler.xgRuntimeResourceFields) - mapData = make(map[string]interface{}) - e = json.Unmarshal(data, &mapData) - a.NoError(e) - assertEq(mapData, "worker_num", 11) - assertEq(mapData, "memory_size", 8192) - assertEq(mapData, "cpu_size", 4) + predClause := ` +SELECT a, b, c, d, e FROM table_xx +PREDICT table_yy +WITH + detail_column = "prediction_detail", + prob_column = "prediction_probability", + encoding_column = "prediction_leafs", + result_column = "prediction_results", + append_columns = ["AA", "BB", "CC"] +USING sqlflow_models.my_xgboost_model; +` + filler = parseAndFill(predClause) + a.EqualValues([]string{"AA", "BB", "CC"}, filler.AppendColumns) + a.EqualValues("prediction_probability", filler.ProbColumn) + a.EqualValues("prediction_results", filler.ResultColumn) + a.EqualValues("prediction_detail", filler.DetailColumn) + a.EqualValues("prediction_leafs", filler.EncodingColumn) } func TestColumnClause(t *testing.T) { @@ -285,33 +277,30 @@ LABEL e INTO model_table; func TestXGBoostFiller(t *testing.T) { a := assert.New(t) + parser := newParser() - testClause := ` + trainClause := ` SELECT * FROM iris.train TRAIN XGBoostRegressor WITH - run_local = true, max_depth = 5, eta = 0.03, tree_method = "hist", - num_round = 300, - append_columns = ["A", B, "C"] + num_round = 300 COLUMN sepal_length, sepal_width, petal_length, petal_width COLUMN gg FOR group COLUMN ww FOR weight LABEL e INTO model_table; ` - pr, e := parser.Parse(testClause) + pr, e := parser.Parse(trainClause) a.NoError(e) fts, e := verify(pr, testDB) a.NoError(e) filler, e := newXGBoostFiller(pr, fts, testDB) a.NoError(e) - a.True(filler.isTrain) - a.True(filler.runLocal) - stdSlct := strings.Replace(filler.standardSelect, "\n", " ", -1) - stdSlct = removeLastSemicolon(stdSlct) + a.True(filler.IsTrain) + stdSlct := removeLastSemicolon(strings.Replace(filler.StandardSelect, "\n", " ", -1)) a.EqualValues("SELECT * FROM iris.train", stdSlct) a.EqualValues("model_table", filler.modelPath) @@ -320,7 +309,6 @@ LABEL e INTO model_table; a.EqualValues(5, filler.MaxDepth) a.EqualValues("hist", filler.TreeMethod) a.EqualValues(300, filler.NumRound) - a.EqualValues([]string{"A", "B", "C"}, filler.AppendColumns) a.EqualValues("e", filler.Label) a.EqualValues("e", filler.LabelField.FeatureName) @@ -338,4 +326,17 @@ LABEL e INTO model_table; a.EqualValues(&xgFeatureMeta{FeatureName: "sepal_width", Dtype: "float32", InputShape: "[1]"}, filler.X[1]) a.EqualValues(&xgFeatureMeta{FeatureName: "petal_length", Dtype: "float32", InputShape: "[1]"}, filler.X[2]) a.EqualValues(&xgFeatureMeta{FeatureName: "petal_width", Dtype: "float32", InputShape: "[1]"}, filler.X[3]) + + colFields := &xgColumnFields{} + e = json.Unmarshal([]byte(filler.xgColumnJSON), colFields) + a.NoError(e) + a.EqualValues(filler.xgColumnFields, *colFields) + dsFields := &xgDataSourceFields{} + e = json.Unmarshal([]byte(filler.xgDataSourceJSON), dsFields) + a.NoError(e) + a.EqualValues(filler.xgDataSourceFields, *dsFields) + xgbFields := &xgboostFields{} + e = json.Unmarshal([]byte(filler.xgboostJSON), xgbFields) + a.NoError(e) + a.EqualValues(filler.xgboostFields, *xgbFields) } diff --git a/sql/python/sqlflow_submitter/xgboost/sqlflow_data_source.py b/sql/python/sqlflow_submitter/xgboost/sqlflow_data_source.py index d773b72826..e21cc4e11e 100644 --- a/sql/python/sqlflow_submitter/xgboost/sqlflow_data_source.py +++ b/sql/python/sqlflow_submitter/xgboost/sqlflow_data_source.py @@ -10,6 +10,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import json import typing from typing import Iterator @@ -41,7 +42,7 @@ def convert_shape(cls, value) -> typing.List: raise XGBoostError('invalid shape %s of FeatureMeta' % value) -class SqlflowDSConfig(typing.NamedTuple): +class SQLFlowDSConfig(typing.NamedTuple): is_train: bool standard_select: str db_config: typing.Dict @@ -55,13 +56,13 @@ class SqlflowDSConfig(typing.NamedTuple): write_batch_size: int = 1024 -class SqlFlowDataSource(DataSource): +class SQLFlowDataSource(DataSource): def __init__(self, rank: int, num_worker: int, column_conf: config_fields.ColumnFields, source_conf): super().__init__(rank, num_worker, column_conf, source_conf) - if not isinstance(source_conf, SqlflowDSConfig): - raise XGBoostError("SqlflowDataSource: invalid source conf") + if not isinstance(source_conf, SQLFlowDSConfig): + raise XGBoostError("SQLFlowDataSource: invalid source conf") # TODO: support tf.feature_column transformation if source_conf.is_tf_integrated: diff --git a/sql/python/sqlflow_submitter/xgboost/sqlflow_data_source_test.py b/sql/python/sqlflow_submitter/xgboost/sqlflow_data_source_test.py index 51f98909fc..809a6333d0 100644 --- a/sql/python/sqlflow_submitter/xgboost/sqlflow_data_source_test.py +++ b/sql/python/sqlflow_submitter/xgboost/sqlflow_data_source_test.py @@ -10,20 +10,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import json import os from unittest import TestCase -from sqlflow_submitter.xgboost.sqlflow_data_source import SqlflowDSConfig, SqlFlowDataSource +from sqlflow_submitter.xgboost.sqlflow_data_source import SQLFlowDSConfig, SQLFlowDataSource from launcher import config_helper, config_fields, register_data_source, XGBoostRecord, XGBoostResult from launcher.data_source import create_data_source_init_fn from sqlflow_submitter.db_test import execute as db_exec from sqlflow_submitter.db import connect, insert_values -class TestSqlFlowDataSource(TestCase): +class TestSQLFlowDataSource(TestCase): def setUp(self) -> None: - register_data_source('sqlflow', SqlflowDSConfig, SqlFlowDataSource) + register_data_source('sqlflow', SQLFlowDSConfig, SQLFlowDataSource) db_conf = { 'driver': 'mysql', @@ -116,13 +117,13 @@ def test_build(self): return ds = self._ds_builder(True) - assert isinstance(ds, SqlFlowDataSource) + assert isinstance(ds, SQLFlowDataSource) assert ds._label_offset == 3 assert ds._group_offset == 4 assert ds._weight_offset == 5 ds = self._ds_builder(False) - assert isinstance(ds, SqlFlowDataSource) + assert isinstance(ds, SQLFlowDataSource) assert ds._append_indices == [3, 4, 1] def test_read(self):