diff --git a/scripts/image_build.sh b/scripts/image_build.sh index 1dcf862f30..071921c435 100644 --- a/scripts/image_build.sh +++ b/scripts/image_build.sh @@ -114,4 +114,4 @@ echo 'get_ipython().magic(u"%autoreload 2")' >> $IPYTHON_STARTUP/00-first.py curl https://raw.githubusercontent.com/sql-machine-learning/sqlflow/develop/example/jupyter/example.ipynb --output /workspace/example.ipynb # 9. install xgboost-launcher -pip install xgboost-launcher==0.0.1 +pip install xgboost-launcher==0.0.3 diff --git a/sql/codegen_xgboost.go b/sql/codegen_xgboost.go index f54f5aa8ce..269fb10e55 100644 --- a/sql/codegen_xgboost.go +++ b/sql/codegen_xgboost.go @@ -14,10 +14,12 @@ package sql import ( + "bytes" "encoding/json" "fmt" "strconv" "strings" + "text/template" "github.com/go-sql-driver/mysql" "sqlflow.org/gohive" @@ -25,18 +27,18 @@ import ( ) type xgboostFiller struct { - modelPath string - xgboostFields + ModelPath string + xgLearningFields xgColumnFields xgDataSourceFields - xgboostJSON string - xgDataSourceJSON string - xgColumnJSON string + LearningJSON string + DataSourceJSON string + ColumnJSON string } -type xgboostFields struct { +type xgLearningFields struct { NumRound uint `json:"num_boost_round,omitempty"` - AutoTrain bool `json:"auto_train,omitempty"` + AutoTrain bool `json:"auto_train"` xgBoosterFields `json:"params,omitempty"` } @@ -74,30 +76,31 @@ type xgResultColumnFields struct { type xgFeatureFields struct { FeatureColumns []string `json:"columns,omitempty"` - IsSparse bool `json:"is_sparse,omitempty"` + IsSparse bool `json:"is_sparse"` Delimiter string `json:"item_delimiter,omitempty"` FeatureSize uint `json:"feature_num,omitempty"` } type xgDataSourceFields struct { - IsTrain bool `json:"is_train,omitempty"` + IsTrain bool `json:"is_train"` StandardSelect string `json:"standard_select,omitempty"` - IsTensorFlowIntegrated bool `json:"is_tf_integrated,omitempty"` + IsTensorFlowIntegrated bool `json:"is_tf_integrated"` X []*xgFeatureMeta `json:"x,omitempty"` LabelField *xgFeatureMeta `json:"label,omitempty"` WeightField *xgFeatureMeta `json:"weight,omitempty"` GroupField *xgFeatureMeta `json:"group,omitempty"` xgDataBaseField `json:"db_config,omitempty"` - WriteBatchSize int `json:"write_batch_size,omitempty"` + OutputTable string `json:"output_table,omitempty"` + WriteBatchSize int `json:"write_batch_size,omitempty"` } type xgDataBaseField struct { - User string `json:"user,omitempty"` - Password string `json:"password,omitempty"` - Host string `json:"host,omitempty"` - Port string `json:"port,omitempty"` - Database string `json:"database,omitempty"` - Driver string `json:"driver,omitempty"` + User string `json:"user"` + Password string `json:"password"` + Host string `json:"host"` + Port string `json:"port"` + Database string `json:"database"` + Driver string `json:"driver"` } type xgFeatureMeta struct { @@ -554,9 +557,10 @@ func xgParseEstimator(pr *extendedSelect, filler *xgboostFiller) error { return nil } +// TODO(sperlingxx): support trainAndValDataset func newXGBoostFiller(pr *extendedSelect, fts fieldTypes, db *DB) (*xgboostFiller, error) { filler := &xgboostFiller{ - modelPath: pr.save, + ModelPath: pr.save, } filler.IsTrain = pr.train filler.StandardSelect = pr.standardSelect.String() @@ -565,10 +569,22 @@ func newXGBoostFiller(pr *extendedSelect, fts fieldTypes, db *DB) (*xgboostFille if e := xgParseAttr(pr, filler); e != nil { return nil, fmt.Errorf("failed to set xgboost attributes: %v", e) } + // set default value of result column field in pred mode + if !pr.train && len(filler.ResultColumn) == 0 { + filler.ResultColumn = "result" + } - // solve keyword: TRAIN (estimator) - if e := xgParseEstimator(pr, filler); e != nil { - return nil, e + if pr.train { + // solve keyword: TRAIN (estimator) + if e := xgParseEstimator(pr, filler); e != nil { + return nil, e + } + } else { + // solve keyword: PREDICT (output_table) + if len(pr.into) == 0 { + return nil, fmt.Errorf("missing output table in xgboost prediction clause") + } + filler.OutputTable = pr.into } // solve keyword: COLUMN (column clauses) @@ -582,23 +598,23 @@ func newXGBoostFiller(pr *extendedSelect, fts fieldTypes, db *DB) (*xgboostFille } // serialize fields - jsonBuffer, e := json.Marshal(filler.xgboostFields) + jsonBuffer, e := json.Marshal(filler.xgLearningFields) if e != nil { return nil, e } - filler.xgboostJSON = string(jsonBuffer) + filler.LearningJSON = string(jsonBuffer) jsonBuffer, e = json.Marshal(filler.xgDataSourceFields) if e != nil { return nil, e } - filler.xgDataSourceJSON = string(jsonBuffer) + filler.DataSourceJSON = string(jsonBuffer) jsonBuffer, e = json.Marshal(filler.xgColumnFields) if e != nil { return nil, e } - filler.xgColumnJSON = string(jsonBuffer) + filler.ColumnJSON = string(jsonBuffer) return filler, nil } @@ -639,3 +655,85 @@ func xgFillDatabaseInfo(r *xgboostFiller, db *DB) (*xgboostFiller, error) { } return r, nil } + +func xgCreatePredictionTable(pr *extendedSelect, r *xgboostFiller, db *DB) error { + dropStmt := fmt.Sprintf("drop table if exists %s;", r.OutputTable) + if _, e := db.Exec(dropStmt); e != nil { + return fmt.Errorf("failed executing %s: %q", dropStmt, e) + } + + fts, e := verify(pr, db) + if e != nil { + return e + } + + var b bytes.Buffer + fmt.Fprintf(&b, "create table %s (", r.OutputTable) + for _, col := range r.AppendColumns { + typ, ok := fts.get(col) + if !ok { + return fmt.Errorf("xgCreatePredictionTable: Cannot find type of field %s", col) + } + stype, e := universalizeColumnType(db.driverName, typ) + if e != nil { + return e + } + fmt.Fprintf(&b, "%s %s, ", col, stype) + } + // add prob column + if len(r.ProbColumn) > 0 { + stype, e := universalizeColumnType(db.driverName, "DOUBLE") + if e != nil { + return e + } + fmt.Fprintf(&b, "%s %s, ", r.ProbColumn, stype) + } + // add detail column + if len(r.DetailColumn) > 0 { + stype, e := universalizeColumnType(db.driverName, "VARCHAR") + if e != nil { + return e + } + fmt.Fprintf(&b, "%s %s, ", r.DetailColumn, stype) + } + // add encoding column + if len(r.EncodingColumn) > 0 { + stype, e := universalizeColumnType(db.driverName, "VARCHAR") + if e != nil { + return e + } + fmt.Fprintf(&b, "%s %s, ", r.EncodingColumn, stype) + } + // add result column + stype, e := universalizeColumnType(db.driverName, "DOUBLE") + if e != nil { + return e + } + fmt.Fprintf(&b, "%s %s);", r.ResultColumn, stype) + + createStmt := b.String() + if _, e := db.Exec(createStmt); e != nil { + return fmt.Errorf("failed executing %s: %q", createStmt, e) + } + return nil +} + +var xgTemplate = template.Must(template.New("codegenXG").Parse(xgTemplateText)) + +const xgTemplateText = ` +from launcher.config_fields import JobType +from sqlflow_submitter.xgboost import run_with_sqlflow + +{{if .IsTrain}} +mode = JobType.TRAIN +{{else}} +mode = JobType.PREDICT +{{end}} + +run_with_sqlflow( + mode=mode, + model_path='{{.ModelPath}}', + learning_config='{{.LearningJSON}}', + data_source_config='{{.DataSourceJSON}}', + column_config='{{.ColumnJSON}}') +` diff --git a/sql/codegen_xgboost_test.go b/sql/codegen_xgboost_test.go index 6bfbf8efee..39cbb972bc 100644 --- a/sql/codegen_xgboost_test.go +++ b/sql/codegen_xgboost_test.go @@ -20,6 +20,35 @@ import ( "testing" ) +const ( + testXGTrainSelectIris = ` +SELECT * +FROM iris.train +TRAIN XGBoostEstimator +WITH + objective = "multi:softmax", + num_class = 3, + max_depth = 5, + eta = 0.3, + tree_method = "approx", + num_round = 30 +COLUMN sepal_length, sepal_width, petal_length, petal_width +LABEL class INTO sqlflow_models.my_xgboost_model; +` + + testXGPredSelectIris = ` +SELECT * +FROM iris.test +PREDICT iris.predict +WITH + append_columns = [sepal_length, sepal_width, petal_length, petal_width], + prob_column = prob, + detail_column = detail, + encoding_column = encoding +USING sqlflow_models.my_xgboost_model; +` +) + func TestPartials(t *testing.T) { a := assert.New(t) tmpMap := make(map[string][]string) @@ -127,7 +156,7 @@ COLUMN a, b, c, d LABEL e INTO table_123; ` filler := parseAndFill(trainClause) - data, e := json.Marshal(filler.xgboostFields) + data, e := json.Marshal(filler.xgLearningFields) a.NoError(e) mapData := make(map[string]interface{}) e = json.Unmarshal(data, &mapData) @@ -302,7 +331,7 @@ LABEL e INTO model_table; a.True(filler.IsTrain) stdSlct := removeLastSemicolon(strings.Replace(filler.StandardSelect, "\n", " ", -1)) a.EqualValues("SELECT * FROM iris.train", stdSlct) - a.EqualValues("model_table", filler.modelPath) + a.EqualValues("model_table", filler.ModelPath) a.EqualValues("reg:squarederror", filler.Objective) a.EqualValues(0.03, filler.Eta) @@ -328,15 +357,15 @@ LABEL e INTO model_table; a.EqualValues(&xgFeatureMeta{FeatureName: "petal_width", Dtype: "float32", InputShape: "[1]"}, filler.X[3]) colFields := &xgColumnFields{} - e = json.Unmarshal([]byte(filler.xgColumnJSON), colFields) + e = json.Unmarshal([]byte(filler.ColumnJSON), colFields) a.NoError(e) a.EqualValues(filler.xgColumnFields, *colFields) dsFields := &xgDataSourceFields{} - e = json.Unmarshal([]byte(filler.xgDataSourceJSON), dsFields) + e = json.Unmarshal([]byte(filler.DataSourceJSON), dsFields) a.NoError(e) a.EqualValues(filler.xgDataSourceFields, *dsFields) - xgbFields := &xgboostFields{} - e = json.Unmarshal([]byte(filler.xgboostJSON), xgbFields) + xgbFields := &xgLearningFields{} + e = json.Unmarshal([]byte(filler.LearningJSON), xgbFields) a.NoError(e) - a.EqualValues(filler.xgboostFields, *xgbFields) + a.EqualValues(filler.xgLearningFields, *xgbFields) } diff --git a/sql/column_type.go b/sql/column_type.go index 4bac8a5b94..0e932f0984 100644 --- a/sql/column_type.go +++ b/sql/column_type.go @@ -186,6 +186,10 @@ func universalizeColumnType(driverName, dialectType string) (string, error) { if strings.HasSuffix(dialectType, hiveCTypeSuffix) { return dialectType[:len(dialectType)-len(hiveCTypeSuffix)], nil } + // In hive, capacity is also needed when define a VARCHAR field, so we replace it with STRING. + if dialectType == "VARCHAR" { + return "STRING", nil + } return dialectType, nil } return "", fmt.Errorf("not support driver:%s", driverName) diff --git a/sql/executor.go b/sql/executor.go index 960d8431d9..1aa5cc7200 100644 --- a/sql/executor.go +++ b/sql/executor.go @@ -344,8 +344,19 @@ func train(wr *PipeWriter, tr *extendedSelect, db *DB, cwd string, modelDir stri } var program bytes.Buffer - if e := genTF(&program, tr, ds, fts, db); e != nil { - return fmt.Errorf("genTF %v", e) + // FIXME(sperlingxx): write a separate train pipeline for xgboost to support remote mode + if os.Getenv("SQLFLOW_submitter") == "XGBOOST" { + filler, e := newXGBoostFiller(tr, fts, db) + if e != nil { + return fmt.Errorf("genXG %v", e) + } + if e := xgTemplate.Execute(&program, filler); e != nil { + return fmt.Errorf("genXG %v", e) + } + } else { + if e := genTF(&program, tr, ds, fts, db); e != nil { + return fmt.Errorf("genTF %v", e) + } } cw := &logChanWriter{wr: wr} @@ -387,10 +398,6 @@ func pred(wr *PipeWriter, pr *extendedSelect, db *DB, cwd string, modelDir strin return fmt.Errorf("verifyColumnNameAndType: %v", e) } - if e := createPredictionTable(tr, pr, db); e != nil { - return fmt.Errorf("createPredictionTable: %v", e) - } - pr.trainClause = tr.trainClause fts, e := verify(pr, db) if e != nil { @@ -398,8 +405,25 @@ func pred(wr *PipeWriter, pr *extendedSelect, db *DB, cwd string, modelDir strin } var buf bytes.Buffer - if e := genTF(&buf, pr, nil, fts, db); e != nil { - return fmt.Errorf("genTF: %v", e) + // FIXME(sperlingxx): write a separate pred pipeline for xgboost to support remote mode + if os.Getenv("SQLFLOW_submitter") == "XGBOOST" { + filler, e := newXGBoostFiller(pr, fts, db) + if e != nil { + return fmt.Errorf("genXG %v", e) + } + if e := xgCreatePredictionTable(pr, filler, db); e != nil { + return fmt.Errorf("genXG %v", e) + } + if e := xgTemplate.Execute(&buf, filler); e != nil { + return fmt.Errorf("genXG %v", e) + } + } else { + if e := createPredictionTable(tr, pr, db); e != nil { + return fmt.Errorf("createPredictionTable: %v", e) + } + if e := genTF(&buf, pr, nil, fts, db); e != nil { + return fmt.Errorf("genTF %v", e) + } } cw := &logChanWriter{wr: wr} diff --git a/sql/executor_test.go b/sql/executor_test.go index 82572f34fe..257d0cd96e 100644 --- a/sql/executor_test.go +++ b/sql/executor_test.go @@ -67,6 +67,23 @@ func TestSplitExtendedSQL(t *testing.T) { a.Equal(`train a with b;`, s[0]) } +func TestExecutorTrainAndPredictXGBoost(t *testing.T) { + a := assert.New(t) + modelDir, e := ioutil.TempDir("/tmp", "sqlflow_models") + a.Nil(e) + defer os.RemoveAll(modelDir) + defer os.Unsetenv("SQLFLOW_submitter") + e = os.Setenv("SQLFLOW_submitter", "XGBOOST") + a.NoError(e) + a.NotPanics(func() { + stream := runExtendedSQL(testXGTrainSelectIris, testDB, modelDir, nil) + a.True(goodStream(stream.ReadAll())) + + stream = runExtendedSQL(testXGPredSelectIris, testDB, modelDir, nil) + a.True(goodStream(stream.ReadAll())) + }) +} + func TestExecutorTrainAndPredictDNN(t *testing.T) { a := assert.New(t) modelDir := "" diff --git a/sql/python/sqlflow_submitter/xgboost/__init__.py b/sql/python/sqlflow_submitter/xgboost/__init__.py index aa16ad420d..47dd54a5e7 100644 --- a/sql/python/sqlflow_submitter/xgboost/__init__.py +++ b/sql/python/sqlflow_submitter/xgboost/__init__.py @@ -10,3 +10,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from .sqlflow_data_source import SQLFlowDataSource, SQLFlowDSConfig +from .sqlflow_xgboost_main import run_with_sqlflow +from .common import XGBoostError + +__all__ = ['run_with_sqlflow', 'SQLFlowDataSource', 'SQLFlowDSConfig', 'XGBoostError'] diff --git a/sql/python/sqlflow_submitter/xgboost/sqlflow_xgboost_main.py b/sql/python/sqlflow_submitter/xgboost/sqlflow_xgboost_main.py new file mode 100644 index 0000000000..83e2be4575 --- /dev/null +++ b/sql/python/sqlflow_submitter/xgboost/sqlflow_xgboost_main.py @@ -0,0 +1,70 @@ +# Copyright 2019 The SQLFlow Authors. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +from launcher import register_data_source, config_helper, config_fields as cf, train, predict + +from sqlflow_submitter.xgboost.common import XGBoostError +from sqlflow_submitter.xgboost.sqlflow_data_source import SQLFlowDSConfig, SQLFlowDataSource + +register_data_source('sqlflow', SQLFlowDSConfig, SQLFlowDataSource) + + +def run_with_sqlflow(mode: str, + model_path: str, + learning_config: str, + data_source_config: str, + column_config: str): + if mode not in (cf.JobType.TRAIN, cf.JobType.PREDICT): + raise XGBoostError('Unknown run mode(%s) of xgboost launcher.' % mode) + is_train = mode == cf.JobType.TRAIN + + def parse_json_str(string: str): + return json.loads(string.replace("\n", " ").replace("\t", " ")) + + learning_fields = None + if is_train: + learning_config = parse_json_str(learning_config) + learning_fields = config_helper.load_config(cf.LearningFields, **learning_config) + + data_source_config = parse_json_str(data_source_config) + ds_fields = cf.DataSourceFields('sqlflow', data_source_config) + column_config = parse_json_str(column_config) + col_fields = config_helper.load_config(cf.ColumnFields, **column_config) + # hard code batch size of prediction with 1024 + data_builder = cf.DataBuilderFields() if is_train else cf.DataBuilderFields(batch_size=1024) + data_fields = cf.DataFields(ds_fields, col_fields, data_builder) + bst_path = os.path.join(model_path, 'sqlflow_booster') + dump_fields = cf.DumpInfoFields( + path=os.path.join(model_path, 'sqlflow_booster.txt'), + with_stats=True, + is_dump_fscore=True) + model_fields = cf.ModelFields(model_path=bst_path, dump_conf=dump_fields) + + if is_train: + try: + # mkdir as tf.estimator + if not os.path.exists(model_path): + os.makedirs(model_path) + train_fields = cf.TrainFields(learning_fields, data_fields, model_fields) + train(train_fields) + except Exception as e: + raise XGBoostError('XGBoost training task failed: %s' % e) + else: + try: + pred_fields = cf.PredictFields(data_fields, model_fields) + predict(pred_fields) + except Exception as e: + raise XGBoostError('XGBoost prediction task failed: %s' % e)