Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# _Design:_ xgboost on sqlflow
# _Design:_ ant-xgboost on sqlflow

## Overview

This is a design doc about why and how to support running xgboost via sqlflow as a machine learning estimator.
This is a design doc about why and how to support running ant-xgboost via sqlflow as a machine learning estimator.

We propose to build a lightweight python template for xgboost on basis of `xgblauncher`,
an incubating xgboost wrapper in [ant-xgboost](https://github.com/alipay/ant-xgboost).
Expand Down
4 changes: 2 additions & 2 deletions scripts/test_e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ export PYTHONPATH=$GOPATH/src/github.com/sql-machine-learning/sqlflow/sql/python
sqlflowserver --datasource=${DATASOURCE} &
# e2e test for standard SQL
SQLFLOW_SERVER=localhost:50051 ipython sql/python/test_magic.py
# e2e test for xgboost train and prediciton SQL.
SQLFLOW_SERVER=localhost:50051 ipython sql/python/test_magic_xgboost.py
# e2e test for ant-xgboost train and prediciton SQL.
SQLFLOW_SERVER=localhost:50051 ipython sql/python/test_magic_ant_xgboost.py
# TODO(terrytangyuan): Enable this when ElasticDL is open sourced
# e2e test for ElasticDL SQL
# export SQLFLOW_submitter=elasticdl
Expand Down
2 changes: 1 addition & 1 deletion sql/codegen_analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func readFeatureNames(pr *extendedSelect, db *DB) ([]string, string, error) {
if strings.HasPrefix(strings.ToUpper(pr.estimator), `XGBOOST.`) {
// TODO(weiguo): It's a quick way to read column and label names from
// xgboost.*, but too heavy.
xgbFiller, err := newXGBoostFiller(pr, nil, db)
xgbFiller, err := newAntXGBoostFiller(pr, nil, db)
if err != nil {
return nil, "", err
}
Expand Down
116 changes: 58 additions & 58 deletions sql/codegen_xgboost.go → sql/codegen_ant_xgboost.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"sqlflow.org/gomaxcompute"
)

type xgboostFiller struct {
type antXGBoostFiller struct {
ModelPath string
xgLearningFields
xgColumnFields
Expand Down Expand Up @@ -164,15 +164,15 @@ func xgMultiSparseError(colNames []string) error {
}

func xgUnknownFCError(kw string) error {
return fmt.Errorf("xgUnknownFCError: feature column keyword(`%s`) is not supported by xgboost engine", kw)
return fmt.Errorf("xgUnknownFCError: feature column keyword(`%s`) is not supported by ant-xgboost engine", kw)
}

func xgUnsupportedColTagError() error {
return fmt.Errorf("xgUnsupportedColTagError: valid column tags of xgboost engine([feature_columns, group, weight])")
return fmt.Errorf("xgUnsupportedColTagError: valid column tags of ant-xgboost engine([feature_columns, group, weight])")
}

func uIntPartial(key string, ptrFn func(*xgboostFiller) *uint) func(*map[string][]string, *xgboostFiller) error {
return func(a *map[string][]string, r *xgboostFiller) error {
func uIntPartial(key string, ptrFn func(*antXGBoostFiller) *uint) func(*map[string][]string, *antXGBoostFiller) error {
return func(a *map[string][]string, r *antXGBoostFiller) error {
// xgParseAttr will ensure the key is existing in map
val, _ := (*a)[key]
if len(val) != 1 {
Expand All @@ -190,8 +190,8 @@ func uIntPartial(key string, ptrFn func(*xgboostFiller) *uint) func(*map[string]
}
}

func fp32Partial(key string, ptrFn func(*xgboostFiller) *float32) func(*map[string][]string, *xgboostFiller) error {
return func(a *map[string][]string, r *xgboostFiller) error {
func fp32Partial(key string, ptrFn func(*antXGBoostFiller) *float32) func(*map[string][]string, *antXGBoostFiller) error {
return func(a *map[string][]string, r *antXGBoostFiller) error {
// xgParseAttr will ensure the key is existing in map
val, _ := (*a)[key]
if len(val) != 1 {
Expand All @@ -209,8 +209,8 @@ func fp32Partial(key string, ptrFn func(*xgboostFiller) *float32) func(*map[stri
}
}

func boolPartial(key string, ptrFn func(*xgboostFiller) *bool) func(*map[string][]string, *xgboostFiller) error {
return func(a *map[string][]string, r *xgboostFiller) error {
func boolPartial(key string, ptrFn func(*antXGBoostFiller) *bool) func(*map[string][]string, *antXGBoostFiller) error {
return func(a *map[string][]string, r *antXGBoostFiller) error {
// xgParseAttr will ensure the key is existing in map
val, _ := (*a)[key]
if len(val) != 1 {
Expand All @@ -227,8 +227,8 @@ func boolPartial(key string, ptrFn func(*xgboostFiller) *bool) func(*map[string]
}
}

func strPartial(key string, ptrFn func(*xgboostFiller) *string) func(*map[string][]string, *xgboostFiller) error {
return func(a *map[string][]string, r *xgboostFiller) error {
func strPartial(key string, ptrFn func(*antXGBoostFiller) *string) func(*map[string][]string, *antXGBoostFiller) error {
return func(a *map[string][]string, r *antXGBoostFiller) error {
// xgParseAttr will ensure the key is existing in map
val, _ := (*a)[key]
if len(val) != 1 {
Expand All @@ -244,8 +244,8 @@ func strPartial(key string, ptrFn func(*xgboostFiller) *string) func(*map[string
}
}

func sListPartial(key string, ptrFn func(*xgboostFiller) *[]string) func(*map[string][]string, *xgboostFiller) error {
return func(a *map[string][]string, r *xgboostFiller) error {
func sListPartial(key string, ptrFn func(*antXGBoostFiller) *[]string) func(*map[string][]string, *antXGBoostFiller) error {
return func(a *map[string][]string, r *antXGBoostFiller) error {
// xgParseAttr will ensure the key is existing in map
val, _ := (*a)[key]
strListPtr := ptrFn(r)
Expand All @@ -258,48 +258,48 @@ func sListPartial(key string, ptrFn func(*xgboostFiller) *[]string) func(*map[st
}
}

var xgbTrainAttrSetterMap = map[string]func(*map[string][]string, *xgboostFiller) error{
var xgbTrainAttrSetterMap = map[string]func(*map[string][]string, *antXGBoostFiller) error{
// booster params
"train.objective": strPartial("train.objective", func(r *xgboostFiller) *string { return &(r.Objective) }),
"train.eval_metric": strPartial("train.eval_metric", func(r *xgboostFiller) *string { return &(r.EvalMetric) }),
"train.booster": strPartial("train.booster", func(r *xgboostFiller) *string { return &(r.Booster) }),
"train.seed": uIntPartial("train.seed", func(r *xgboostFiller) *uint { return &(r.Seed) }),
"train.num_class": uIntPartial("train.num_class", func(r *xgboostFiller) *uint { return &(r.NumClass) }),
"train.eta": fp32Partial("train.eta", func(r *xgboostFiller) *float32 { return &(r.Eta) }),
"train.gamma": fp32Partial("train.gamma", func(r *xgboostFiller) *float32 { return &(r.Gamma) }),
"train.max_depth": uIntPartial("train.max_depth", func(r *xgboostFiller) *uint { return &(r.MaxDepth) }),
"train.min_child_weight": uIntPartial("train.min_child_weight", func(r *xgboostFiller) *uint { return &(r.MinChildWeight) }),
"train.subsample": fp32Partial("train.subsample", func(r *xgboostFiller) *float32 { return &(r.Subsample) }),
"train.colsample_bytree": fp32Partial("train.colsample_bytree", func(r *xgboostFiller) *float32 { return &(r.ColSampleByTree) }),
"train.colsample_bylevel": fp32Partial("train.colsample_bylevel", func(r *xgboostFiller) *float32 { return &(r.ColSampleByLevel) }),
"train.colsample_bynode": fp32Partial("train.colsample_bynode", func(r *xgboostFiller) *float32 { return &(r.ColSampleByNode) }),
"train.lambda": fp32Partial("train.lambda", func(r *xgboostFiller) *float32 { return &(r.Lambda) }),
"train.alpha": fp32Partial("train.alpha", func(r *xgboostFiller) *float32 { return &(r.Alpha) }),
"train.tree_method": strPartial("train.tree_method", func(r *xgboostFiller) *string { return &(r.TreeMethod) }),
"train.sketch_eps": fp32Partial("train.sketch_eps", func(r *xgboostFiller) *float32 { return &(r.SketchEps) }),
"train.scale_pos_weight": fp32Partial("train.scale_pos_weight", func(r *xgboostFiller) *float32 { return &(r.ScalePosWeight) }),
"train.grow_policy": strPartial("train.grow_policy", func(r *xgboostFiller) *string { return &(r.GrowPolicy) }),
"train.max_leaves": uIntPartial("train.max_leaves", func(r *xgboostFiller) *uint { return &(r.MaxLeaves) }),
"train.max_bin": uIntPartial("train.max_bin", func(r *xgboostFiller) *uint { return &(r.MaxBin) }),
"train.num_parallel_tree": uIntPartial("train.num_parallel_tree", func(r *xgboostFiller) *uint { return &(r.NumParallelTree) }),
"train.convergence_criteria": strPartial("train.convergence_criteria", func(r *xgboostFiller) *string { return &(r.ConvergenceCriteria) }),
"train.verbosity": uIntPartial("train.verbosity", func(r *xgboostFiller) *uint { return &(r.Verbosity) }),
"train.objective": strPartial("train.objective", func(r *antXGBoostFiller) *string { return &(r.Objective) }),
"train.eval_metric": strPartial("train.eval_metric", func(r *antXGBoostFiller) *string { return &(r.EvalMetric) }),
"train.booster": strPartial("train.booster", func(r *antXGBoostFiller) *string { return &(r.Booster) }),
"train.seed": uIntPartial("train.seed", func(r *antXGBoostFiller) *uint { return &(r.Seed) }),
"train.num_class": uIntPartial("train.num_class", func(r *antXGBoostFiller) *uint { return &(r.NumClass) }),
"train.eta": fp32Partial("train.eta", func(r *antXGBoostFiller) *float32 { return &(r.Eta) }),
"train.gamma": fp32Partial("train.gamma", func(r *antXGBoostFiller) *float32 { return &(r.Gamma) }),
"train.max_depth": uIntPartial("train.max_depth", func(r *antXGBoostFiller) *uint { return &(r.MaxDepth) }),
"train.min_child_weight": uIntPartial("train.min_child_weight", func(r *antXGBoostFiller) *uint { return &(r.MinChildWeight) }),
"train.subsample": fp32Partial("train.subsample", func(r *antXGBoostFiller) *float32 { return &(r.Subsample) }),
"train.colsample_bytree": fp32Partial("train.colsample_bytree", func(r *antXGBoostFiller) *float32 { return &(r.ColSampleByTree) }),
"train.colsample_bylevel": fp32Partial("train.colsample_bylevel", func(r *antXGBoostFiller) *float32 { return &(r.ColSampleByLevel) }),
"train.colsample_bynode": fp32Partial("train.colsample_bynode", func(r *antXGBoostFiller) *float32 { return &(r.ColSampleByNode) }),
"train.lambda": fp32Partial("train.lambda", func(r *antXGBoostFiller) *float32 { return &(r.Lambda) }),
"train.alpha": fp32Partial("train.alpha", func(r *antXGBoostFiller) *float32 { return &(r.Alpha) }),
"train.tree_method": strPartial("train.tree_method", func(r *antXGBoostFiller) *string { return &(r.TreeMethod) }),
"train.sketch_eps": fp32Partial("train.sketch_eps", func(r *antXGBoostFiller) *float32 { return &(r.SketchEps) }),
"train.scale_pos_weight": fp32Partial("train.scale_pos_weight", func(r *antXGBoostFiller) *float32 { return &(r.ScalePosWeight) }),
"train.grow_policy": strPartial("train.grow_policy", func(r *antXGBoostFiller) *string { return &(r.GrowPolicy) }),
"train.max_leaves": uIntPartial("train.max_leaves", func(r *antXGBoostFiller) *uint { return &(r.MaxLeaves) }),
"train.max_bin": uIntPartial("train.max_bin", func(r *antXGBoostFiller) *uint { return &(r.MaxBin) }),
"train.num_parallel_tree": uIntPartial("train.num_parallel_tree", func(r *antXGBoostFiller) *uint { return &(r.NumParallelTree) }),
"train.convergence_criteria": strPartial("train.convergence_criteria", func(r *antXGBoostFiller) *string { return &(r.ConvergenceCriteria) }),
"train.verbosity": uIntPartial("train.verbosity", func(r *antXGBoostFiller) *uint { return &(r.Verbosity) }),
// xgboost train controllers
"train.num_round": uIntPartial("train.num_round", func(r *xgboostFiller) *uint { return &(r.NumRound) }),
"train.auto_train": boolPartial("train.auto_train", func(r *xgboostFiller) *bool { return &(r.AutoTrain) }),
"train.num_round": uIntPartial("train.num_round", func(r *antXGBoostFiller) *uint { return &(r.NumRound) }),
"train.auto_train": boolPartial("train.auto_train", func(r *antXGBoostFiller) *bool { return &(r.AutoTrain) }),
// Label, Group, Weight and xgFeatureFields are parsed from columnClause
}

var xgbPredAttrSetterMap = map[string]func(*map[string][]string, *xgboostFiller) error{
var xgbPredAttrSetterMap = map[string]func(*map[string][]string, *antXGBoostFiller) error{
// xgboost output columns (for prediction)
"pred.append_columns": sListPartial("pred.append_columns", func(r *xgboostFiller) *[]string { return &(r.AppendColumns) }),
"pred.prob_column": strPartial("pred.prob_column", func(r *xgboostFiller) *string { return &(r.ProbColumn) }),
"pred.detail_column": strPartial("pred.detail_column", func(r *xgboostFiller) *string { return &(r.DetailColumn) }),
"pred.encoding_column": strPartial("pred.encoding_column", func(r *xgboostFiller) *string { return &(r.EncodingColumn) }),
"pred.append_columns": sListPartial("pred.append_columns", func(r *antXGBoostFiller) *[]string { return &(r.AppendColumns) }),
"pred.prob_column": strPartial("pred.prob_column", func(r *antXGBoostFiller) *string { return &(r.ProbColumn) }),
"pred.detail_column": strPartial("pred.detail_column", func(r *antXGBoostFiller) *string { return &(r.DetailColumn) }),
"pred.encoding_column": strPartial("pred.encoding_column", func(r *antXGBoostFiller) *string { return &(r.EncodingColumn) }),
// Label, Group, Weight and xgFeatureFields are parsed from columnClause
}

func xgParseAttr(pr *extendedSelect, r *xgboostFiller) error {
func xgParseAttr(pr *extendedSelect, r *antXGBoostFiller) error {
var rawAttrs map[string]*expr
if pr.train {
rawAttrs = pr.trainAttrs
Expand All @@ -324,8 +324,8 @@ func xgParseAttr(pr *extendedSelect, r *xgboostFiller) error {
}
}

// fill xgboostFiller with attrs
var setterMap map[string]func(*map[string][]string, *xgboostFiller) error
// fill antXGBoostFiller with attrs
var setterMap map[string]func(*map[string][]string, *antXGBoostFiller) error
if pr.train {
setterMap = xgbTrainAttrSetterMap
} else {
Expand Down Expand Up @@ -358,7 +358,7 @@ func xgParseAttr(pr *extendedSelect, r *xgboostFiller) error {
// data example: COLUMN SPARSE("0:1.5 1:100.1f 11:-1.2", [20], " ")
// 2. tf feature columns
// Roughly same as TFEstimator, except output shape of feaColumns are required to be 1-dim.
func parseFeatureColumns(columns *exprlist, r *xgboostFiller) error {
func parseFeatureColumns(columns *exprlist, r *antXGBoostFiller) error {
feaCols, colSpecs, err := resolveTrainColumns(columns)
if err != nil {
return err
Expand All @@ -379,7 +379,7 @@ func parseFeatureColumns(columns *exprlist, r *xgboostFiller) error {

// parseSparseKeyValueFeatures, parse features which is identified by `SPARSE`.
// ex: SPARSE(col1, [100], comma)
func parseSparseKeyValueFeatures(colSpecs []*columnSpec, r *xgboostFiller) error {
func parseSparseKeyValueFeatures(colSpecs []*columnSpec, r *antXGBoostFiller) error {
var colNames []string
for _, spec := range colSpecs {
colNames = append(colNames, spec.ColumnName)
Expand Down Expand Up @@ -425,7 +425,7 @@ func isSimpleColumn(col featureColumn) bool {
return false
}

func parseDenseFeatures(feaCols []featureColumn, r *xgboostFiller) error {
func parseDenseFeatures(feaCols []featureColumn, r *antXGBoostFiller) error {
allSimpleCol := true
for _, col := range feaCols {
if allSimpleCol && !isSimpleColumn(col) {
Expand Down Expand Up @@ -511,7 +511,7 @@ func parseSimpleColumn(field string, columns *exprlist) (*xgFeatureMeta, error)
return fm, nil
}

func xgParseColumns(pr *extendedSelect, filler *xgboostFiller) error {
func xgParseColumns(pr *extendedSelect, filler *antXGBoostFiller) error {
for target, columns := range pr.columns {
switch target {
case "feature_columns":
Expand Down Expand Up @@ -553,7 +553,7 @@ func xgParseColumns(pr *extendedSelect, filler *xgboostFiller) error {
return nil
}

func xgParseEstimator(pr *extendedSelect, filler *xgboostFiller) error {
func xgParseEstimator(pr *extendedSelect, filler *antXGBoostFiller) error {
switch strings.ToUpper(pr.estimator) {
case "XGBOOST.ESTIMATOR":
if len(filler.Objective) == 0 {
Expand Down Expand Up @@ -590,8 +590,8 @@ func xgParseEstimator(pr *extendedSelect, filler *xgboostFiller) error {
return nil
}

func newXGBoostFiller(pr *extendedSelect, ds *trainAndValDataset, db *DB) (*xgboostFiller, error) {
filler := &xgboostFiller{
func newAntXGBoostFiller(pr *extendedSelect, ds *trainAndValDataset, db *DB) (*antXGBoostFiller, error) {
filler := &antXGBoostFiller{
ModelPath: pr.save,
}
filler.IsTrain = pr.train
Expand Down Expand Up @@ -720,7 +720,7 @@ func xgFillDatabaseInfo(r *xgDataSourceFields, db *DB) error {
return nil
}

func xgCreatePredictionTable(pr *extendedSelect, r *xgboostFiller, db *DB) error {
func xgCreatePredictionTable(pr *extendedSelect, r *antXGBoostFiller, db *DB) error {
dropStmt := fmt.Sprintf("drop table if exists %s;", r.OutputTable)
if _, e := db.Exec(dropStmt); e != nil {
return fmt.Errorf("failed executing %s: %q", dropStmt, e)
Expand Down Expand Up @@ -791,7 +791,7 @@ func xgCreatePredictionTable(pr *extendedSelect, r *xgboostFiller, db *DB) error
}

func genXG(w io.Writer, pr *extendedSelect, ds *trainAndValDataset, fts fieldTypes, db *DB) error {
r, e := newXGBoostFiller(pr, ds, db)
r, e := newAntXGBoostFiller(pr, ds, db)
if e != nil {
return e
}
Expand All @@ -808,7 +808,7 @@ var xgTemplate = template.Must(template.New("codegenXG").Parse(xgTemplateText))

const xgTemplateText = `
from launcher.config_fields import JobType
from sqlflow_submitter.xgboost import run_with_sqlflow
from sqlflow_submitter.ant_xgboost import run_with_sqlflow

{{if .IsTrain}}
mode = JobType.TRAIN
Expand Down
Loading