Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/image_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,4 @@ echo 'get_ipython().magic(u"%autoreload 2")' >> $IPYTHON_STARTUP/00-first.py
curl https://raw.githubusercontent.com/sql-machine-learning/sqlflow/develop/example/jupyter/example.ipynb --output /workspace/example.ipynb

# 9. install xgboost-launcher
pip install xgboost-launcher==0.0.1
pip install xgboost-launcher==0.0.3
148 changes: 123 additions & 25 deletions sql/codegen_xgboost.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,31 @@
package sql

import (
"bytes"
"encoding/json"
"fmt"
"strconv"
"strings"
"text/template"

"github.com/go-sql-driver/mysql"
"sqlflow.org/gohive"
"sqlflow.org/gomaxcompute"
)

type xgboostFiller struct {
modelPath string
xgboostFields
ModelPath string
xgLearningFields
xgColumnFields
xgDataSourceFields
xgboostJSON string
xgDataSourceJSON string
xgColumnJSON string
LearningJSON string
DataSourceJSON string
ColumnJSON string
}

type xgboostFields struct {
type xgLearningFields struct {
NumRound uint `json:"num_boost_round,omitempty"`
AutoTrain bool `json:"auto_train,omitempty"`
AutoTrain bool `json:"auto_train"`
xgBoosterFields `json:"params,omitempty"`
}

Expand Down Expand Up @@ -74,30 +76,31 @@ type xgResultColumnFields struct {

type xgFeatureFields struct {
FeatureColumns []string `json:"columns,omitempty"`
IsSparse bool `json:"is_sparse,omitempty"`
IsSparse bool `json:"is_sparse"`
Delimiter string `json:"item_delimiter,omitempty"`
FeatureSize uint `json:"feature_num,omitempty"`
}

type xgDataSourceFields struct {
IsTrain bool `json:"is_train,omitempty"`
IsTrain bool `json:"is_train"`
StandardSelect string `json:"standard_select,omitempty"`
IsTensorFlowIntegrated bool `json:"is_tf_integrated,omitempty"`
IsTensorFlowIntegrated bool `json:"is_tf_integrated"`
X []*xgFeatureMeta `json:"x,omitempty"`
LabelField *xgFeatureMeta `json:"label,omitempty"`
WeightField *xgFeatureMeta `json:"weight,omitempty"`
GroupField *xgFeatureMeta `json:"group,omitempty"`
xgDataBaseField `json:"db_config,omitempty"`
WriteBatchSize int `json:"write_batch_size,omitempty"`
OutputTable string `json:"output_table,omitempty"`
WriteBatchSize int `json:"write_batch_size,omitempty"`
}

type xgDataBaseField struct {
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
Driver string `json:"driver,omitempty"`
User string `json:"user"`
Password string `json:"password"`
Host string `json:"host"`
Port string `json:"port"`
Database string `json:"database"`
Driver string `json:"driver"`
}

type xgFeatureMeta struct {
Expand Down Expand Up @@ -554,9 +557,10 @@ func xgParseEstimator(pr *extendedSelect, filler *xgboostFiller) error {
return nil
}

// TODO(sperlingxx): support trainAndValDataset
func newXGBoostFiller(pr *extendedSelect, fts fieldTypes, db *DB) (*xgboostFiller, error) {
filler := &xgboostFiller{
modelPath: pr.save,
ModelPath: pr.save,
}
filler.IsTrain = pr.train
filler.StandardSelect = pr.standardSelect.String()
Expand All @@ -565,10 +569,22 @@ func newXGBoostFiller(pr *extendedSelect, fts fieldTypes, db *DB) (*xgboostFille
if e := xgParseAttr(pr, filler); e != nil {
return nil, fmt.Errorf("failed to set xgboost attributes: %v", e)
}
// set default value of result column field in pred mode
if !pr.train && len(filler.ResultColumn) == 0 {
filler.ResultColumn = "result"
}

// solve keyword: TRAIN (estimator)
if e := xgParseEstimator(pr, filler); e != nil {
return nil, e
if pr.train {
// solve keyword: TRAIN (estimator)
if e := xgParseEstimator(pr, filler); e != nil {
return nil, e
}
} else {
// solve keyword: PREDICT (output_table)
if len(pr.into) == 0 {
return nil, fmt.Errorf("missing output table in xgboost prediction clause")
}
filler.OutputTable = pr.into
}

// solve keyword: COLUMN (column clauses)
Expand All @@ -582,23 +598,23 @@ func newXGBoostFiller(pr *extendedSelect, fts fieldTypes, db *DB) (*xgboostFille
}

// serialize fields
jsonBuffer, e := json.Marshal(filler.xgboostFields)
jsonBuffer, e := json.Marshal(filler.xgLearningFields)
if e != nil {
return nil, e
}
filler.xgboostJSON = string(jsonBuffer)
filler.LearningJSON = string(jsonBuffer)

jsonBuffer, e = json.Marshal(filler.xgDataSourceFields)
if e != nil {
return nil, e
}
filler.xgDataSourceJSON = string(jsonBuffer)
filler.DataSourceJSON = string(jsonBuffer)

jsonBuffer, e = json.Marshal(filler.xgColumnFields)
if e != nil {
return nil, e
}
filler.xgColumnJSON = string(jsonBuffer)
filler.ColumnJSON = string(jsonBuffer)

return filler, nil
}
Expand Down Expand Up @@ -639,3 +655,85 @@ func xgFillDatabaseInfo(r *xgboostFiller, db *DB) (*xgboostFiller, error) {
}
return r, nil
}

func xgCreatePredictionTable(pr *extendedSelect, r *xgboostFiller, db *DB) error {
dropStmt := fmt.Sprintf("drop table if exists %s;", r.OutputTable)
if _, e := db.Exec(dropStmt); e != nil {
return fmt.Errorf("failed executing %s: %q", dropStmt, e)
}

fts, e := verify(pr, db)
if e != nil {
return e
}

var b bytes.Buffer
fmt.Fprintf(&b, "create table %s (", r.OutputTable)
for _, col := range r.AppendColumns {
typ, ok := fts.get(col)
if !ok {
return fmt.Errorf("xgCreatePredictionTable: Cannot find type of field %s", col)
}
stype, e := universalizeColumnType(db.driverName, typ)
if e != nil {
return e
}
fmt.Fprintf(&b, "%s %s, ", col, stype)
}
// add prob column
if len(r.ProbColumn) > 0 {
stype, e := universalizeColumnType(db.driverName, "DOUBLE")
if e != nil {
return e
}
fmt.Fprintf(&b, "%s %s, ", r.ProbColumn, stype)
}
// add detail column
if len(r.DetailColumn) > 0 {
stype, e := universalizeColumnType(db.driverName, "VARCHAR")
if e != nil {
return e
}
fmt.Fprintf(&b, "%s %s, ", r.DetailColumn, stype)
}
// add encoding column
if len(r.EncodingColumn) > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the usage of the encoding column?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Encoding column stores leaf indices of this sample in each tree. We transform leaf indices in a string which format like: "index_0,index_1,......,index_n"

stype, e := universalizeColumnType(db.driverName, "VARCHAR")
if e != nil {
return e
}
fmt.Fprintf(&b, "%s %s, ", r.EncodingColumn, stype)
}
// add result column
stype, e := universalizeColumnType(db.driverName, "DOUBLE")
if e != nil {
return e
}
fmt.Fprintf(&b, "%s %s);", r.ResultColumn, stype)

createStmt := b.String()
if _, e := db.Exec(createStmt); e != nil {
return fmt.Errorf("failed executing %s: %q", createStmt, e)
}
return nil
}

var xgTemplate = template.Must(template.New("codegenXG").Parse(xgTemplateText))

const xgTemplateText = `
from launcher.config_fields import JobType
from sqlflow_submitter.xgboost import run_with_sqlflow

{{if .IsTrain}}
mode = JobType.TRAIN
{{else}}
mode = JobType.PREDICT
{{end}}

run_with_sqlflow(
mode=mode,
model_path='{{.ModelPath}}',
learning_config='{{.LearningJSON}}',
data_source_config='{{.DataSourceJSON}}',
column_config='{{.ColumnJSON}}')
`
43 changes: 36 additions & 7 deletions sql/codegen_xgboost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,35 @@ import (
"testing"
)

const (
testXGTrainSelectIris = `
SELECT *
FROM iris.train
TRAIN XGBoostEstimator
WITH
objective = "multi:softmax",
num_class = 3,
max_depth = 5,
eta = 0.3,
tree_method = "approx",
num_round = 30
COLUMN sepal_length, sepal_width, petal_length, petal_width
LABEL class INTO sqlflow_models.my_xgboost_model;
`

testXGPredSelectIris = `
SELECT *
FROM iris.test
PREDICT iris.predict
WITH
append_columns = [sepal_length, sepal_width, petal_length, petal_width],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to avoid append_columns? A training job should memorize the field names later used for a prediction job.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does prob_column , detail_column is required? Can we add these columns by default, so the WITH statment can be shorter...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prob_column, detail_column, leaf_column and append_columns are optional. Only result_column is required for prediction task, which has a default column field: "result".

Copy link
Collaborator

@Yancey0623 Yancey0623 Aug 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can keep consistent with the TF example, specific result column in PREDICT iris.predict.result?

prob_column = prob,
detail_column = detail,
encoding_column = encoding
USING sqlflow_models.my_xgboost_model;
`
)

func TestPartials(t *testing.T) {
a := assert.New(t)
tmpMap := make(map[string][]string)
Expand Down Expand Up @@ -127,7 +156,7 @@ COLUMN a, b, c, d
LABEL e INTO table_123;
`
filler := parseAndFill(trainClause)
data, e := json.Marshal(filler.xgboostFields)
data, e := json.Marshal(filler.xgLearningFields)
a.NoError(e)
mapData := make(map[string]interface{})
e = json.Unmarshal(data, &mapData)
Expand Down Expand Up @@ -302,7 +331,7 @@ LABEL e INTO model_table;
a.True(filler.IsTrain)
stdSlct := removeLastSemicolon(strings.Replace(filler.StandardSelect, "\n", " ", -1))
a.EqualValues("SELECT * FROM iris.train", stdSlct)
a.EqualValues("model_table", filler.modelPath)
a.EqualValues("model_table", filler.ModelPath)

a.EqualValues("reg:squarederror", filler.Objective)
a.EqualValues(0.03, filler.Eta)
Expand All @@ -328,15 +357,15 @@ LABEL e INTO model_table;
a.EqualValues(&xgFeatureMeta{FeatureName: "petal_width", Dtype: "float32", InputShape: "[1]"}, filler.X[3])

colFields := &xgColumnFields{}
e = json.Unmarshal([]byte(filler.xgColumnJSON), colFields)
e = json.Unmarshal([]byte(filler.ColumnJSON), colFields)
a.NoError(e)
a.EqualValues(filler.xgColumnFields, *colFields)
dsFields := &xgDataSourceFields{}
e = json.Unmarshal([]byte(filler.xgDataSourceJSON), dsFields)
e = json.Unmarshal([]byte(filler.DataSourceJSON), dsFields)
a.NoError(e)
a.EqualValues(filler.xgDataSourceFields, *dsFields)
xgbFields := &xgboostFields{}
e = json.Unmarshal([]byte(filler.xgboostJSON), xgbFields)
xgbFields := &xgLearningFields{}
e = json.Unmarshal([]byte(filler.LearningJSON), xgbFields)
a.NoError(e)
a.EqualValues(filler.xgboostFields, *xgbFields)
a.EqualValues(filler.xgLearningFields, *xgbFields)
}
4 changes: 4 additions & 0 deletions sql/column_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ func universalizeColumnType(driverName, dialectType string) (string, error) {
if strings.HasSuffix(dialectType, hiveCTypeSuffix) {
return dialectType[:len(dialectType)-len(hiveCTypeSuffix)], nil
}
// In hive, capacity is also needed when define a VARCHAR field, so we replace it with STRING.
if dialectType == "VARCHAR" {
return "STRING", nil
}
return dialectType, nil
}
return "", fmt.Errorf("not support driver:%s", driverName)
Expand Down
40 changes: 32 additions & 8 deletions sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,19 @@ func train(wr *PipeWriter, tr *extendedSelect, db *DB, cwd string, modelDir stri
}

var program bytes.Buffer
if e := genTF(&program, tr, ds, fts, db); e != nil {
return fmt.Errorf("genTF %v", e)
// FIXME(sperlingxx): write a separate train pipeline for xgboost to support remote mode
if os.Getenv("SQLFLOW_submitter") == "XGBOOST" {
filler, e := newXGBoostFiller(tr, fts, db)
if e != nil {
return fmt.Errorf("genXG %v", e)
}
if e := xgTemplate.Execute(&program, filler); e != nil {
return fmt.Errorf("genXG %v", e)
}
} else {
if e := genTF(&program, tr, ds, fts, db); e != nil {
return fmt.Errorf("genTF %v", e)
}
}

cw := &logChanWriter{wr: wr}
Expand Down Expand Up @@ -387,19 +398,32 @@ func pred(wr *PipeWriter, pr *extendedSelect, db *DB, cwd string, modelDir strin
return fmt.Errorf("verifyColumnNameAndType: %v", e)
}

if e := createPredictionTable(tr, pr, db); e != nil {
return fmt.Errorf("createPredictionTable: %v", e)
}

pr.trainClause = tr.trainClause
fts, e := verify(pr, db)
if e != nil {
return fmt.Errorf("verify: %v", e)
}

var buf bytes.Buffer
if e := genTF(&buf, pr, nil, fts, db); e != nil {
return fmt.Errorf("genTF: %v", e)
// FIXME(sperlingxx): write a separate pred pipeline for xgboost to support remote mode
if os.Getenv("SQLFLOW_submitter") == "XGBOOST" {
filler, e := newXGBoostFiller(pr, fts, db)
if e != nil {
return fmt.Errorf("genXG %v", e)
}
if e := xgCreatePredictionTable(pr, filler, db); e != nil {
return fmt.Errorf("genXG %v", e)
}
if e := xgTemplate.Execute(&buf, filler); e != nil {
return fmt.Errorf("genXG %v", e)
}
} else {
if e := createPredictionTable(tr, pr, db); e != nil {
return fmt.Errorf("createPredictionTable: %v", e)
}
if e := genTF(&buf, pr, nil, fts, db); e != nil {
return fmt.Errorf("genTF %v", e)
}
}

cw := &logChanWriter{wr: wr}
Expand Down
Loading