From 2d6973d8fc1e94e0b2fe11a6c5de7287e09aedbc Mon Sep 17 00:00:00 2001 From: asmyasnikov <79263256394@ya.ru> Date: Wed, 9 Nov 2022 12:58:28 +0300 Subject: [PATCH 1/2] WIP --- migration/README.md | 3 + migration/main.go | 159 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+) create mode 100644 migration/README.md create mode 100644 migration/main.go diff --git a/migration/README.md b/migration/README.md new file mode 100644 index 0000000..43acaeb --- /dev/null +++ b/migration/README.md @@ -0,0 +1,3 @@ +# Migration example + +Example demonstrates migration of table schema. \ No newline at end of file diff --git a/migration/main.go b/migration/main.go new file mode 100644 index 0000000..1e7a9ee --- /dev/null +++ b/migration/main.go @@ -0,0 +1,159 @@ +package main + +import ( + "context" + "flag" + "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/options" + "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "os" + "path" + "strconv" + + environ "github.com/ydb-platform/ydb-go-sdk-auth-environ" + ydb "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/sugar" +) + +var ( + dsn string + prefix string +) + +func init() { + required := []string{"ydb"} + flagSet := flag.NewFlagSet(os.Args[0], flag.ExitOnError) + flagSet.Usage = func() { + out := flagSet.Output() + _, _ = fmt.Fprintf(out, "Usage:\n%s [options]\n", os.Args[0]) + _, _ = fmt.Fprintf(out, "\nOptions:\n") + flagSet.PrintDefaults() + } + flagSet.StringVar(&dsn, + "ydb", "", + "YDB connection string", + ) + flagSet.StringVar(&prefix, + "prefix", "", + "tables prefix", + ) + if err := flagSet.Parse(os.Args[1:]); err != nil { + flagSet.Usage() + os.Exit(1) + } + flagSet.Visit(func(f *flag.Flag) { + for i, arg := range required { + if arg == f.Name { + required = append(required[:i], required[i+1:]...) + } + } + }) + if len(required) > 0 { + fmt.Printf("\nSome required options not defined: %v\n\n", required) + flagSet.Usage() + os.Exit(1) + } +} + +func createTable(ctx context.Context, c table.Client, prefix string) (err error) { + return c.Do(ctx, + func(ctx context.Context, s table.Session) error { + return s.CreateTable(ctx, path.Join(prefix, "series"), + options.WithColumn("series_id", types.Optional(types.TypeUint64)), + options.WithColumn("title", types.Optional(types.TypeUTF8)), + options.WithColumn("series_info", types.Optional(types.TypeUTF8)), + options.WithColumn("release_date", types.Optional(types.TypeUint64)), + options.WithColumn("comment", types.Optional(types.TypeUTF8)), + options.WithPrimaryKeyColumn("series_id"), + ) + }, + ) +} + +func tableVersion(ctx context.Context, c table.Client, prefix string) (version int, _ error) { + err := c.Do(ctx, + func(ctx context.Context, s table.Session) error { + description, err := s.DescribeTable(ctx, path.Join(prefix, "series")) + if err != nil { + if ydb.IsOperationErrorSchemeError(err) { + version = -1 + return nil + } + return err + } + if v, has := description.Attributes["version"]; has { + version, err = strconv.Atoi(v) + if err != nil { + return err + } + } + return nil + }, + ) + return version, err +} + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + opts := []ydb.Option{ + environ.WithEnvironCredentials(ctx), + } + db, err := ydb.Open(ctx, dsn, opts...) + if err != nil { + panic(fmt.Errorf("connect error: %w", err)) + } + defer func() { _ = db.Close(ctx) }() + + prefix = path.Join(db.Name(), prefix) + + err = sugar.RemoveRecursive(ctx, db, prefix) + if err != nil { + panic(err) + } + + err = sugar.MakeRecursive(ctx, db, prefix) + if err != nil { + panic(err) + } + + err = describeTableOptions(ctx, db.Table()) + if err != nil { + panic(fmt.Errorf("describe table options error: %w", err)) + } + + err = createTables(ctx, db.Table(), prefix) + if err != nil { + panic(fmt.Errorf("create tables error: %w", err)) + } + + err = describeTable(ctx, db.Table(), path.Join( + prefix, "series", + )) + if err != nil { + panic(fmt.Errorf("describe table error: %w", err)) + } + + err = fillTablesWithData(ctx, db.Table(), prefix) + if err != nil { + panic(fmt.Errorf("fill tables with data error: %w", err)) + } + + err = selectSimple(ctx, db.Table(), prefix) + if err != nil { + panic(fmt.Errorf("select simple error: %w", err)) + } + + err = scanQuerySelect(ctx, db.Table(), prefix) + if err != nil { + panic(fmt.Errorf("scan query select error: %w", err)) + } + + err = readTable(ctx, db.Table(), path.Join( + prefix, "series", + )) + if err != nil { + panic(fmt.Errorf("read table error: %w", err)) + } +} From db5772492d25e65a59d9207976c74d47e8b471ed Mon Sep 17 00:00:00 2001 From: asmyasnikov <79263256394@ya.ru> Date: Fri, 18 Nov 2022 18:11:21 +0300 Subject: [PATCH 2/2] update dependency + fixes --- basic/series.go | 3 +++ go.mod | 2 +- go.sum | 2 ++ topic/cdc-cache-bus-freeseats/database.go | 23 ++++++++++++++--------- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/basic/series.go b/basic/series.go index 71672c3..09435e9 100644 --- a/basic/series.go +++ b/basic/series.go @@ -338,6 +338,7 @@ func createTables(ctx context.Context, c table.Client, prefix string) (err error options.WithPrimaryKeyColumn("series_id"), ) }, + table.WithIdempotent(), ) if err != nil { return err @@ -354,6 +355,7 @@ func createTables(ctx context.Context, c table.Client, prefix string) (err error options.WithPrimaryKeyColumn("series_id", "season_id"), ) }, + table.WithIdempotent(), ) if err != nil { return err @@ -370,6 +372,7 @@ func createTables(ctx context.Context, c table.Client, prefix string) (err error options.WithPrimaryKeyColumn("series_id", "season_id", "episode_id"), ) }, + table.WithIdempotent(), ) if err != nil { return err diff --git a/go.mod b/go.mod index be20ca8..d57fc9a 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2 github.com/ydb-platform/ydb-go-sdk-prometheus v0.11.10 github.com/ydb-platform/ydb-go-sdk-zerolog v0.12.2 - github.com/ydb-platform/ydb-go-sdk/v3 v3.39.0 + github.com/ydb-platform/ydb-go-sdk/v3 v3.40.0 github.com/ydb-platform/ydb-go-yc v0.9.1 google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc ) diff --git a/go.sum b/go.sum index 28cd337..42af34d 100644 --- a/go.sum +++ b/go.sum @@ -259,6 +259,8 @@ github.com/ydb-platform/ydb-go-sdk/v3 v3.25.3/go.mod h1:PFizF/vJsdAgEwjK3DVSBD52 github.com/ydb-platform/ydb-go-sdk/v3 v3.35.1/go.mod h1:eD5OyVA8MuMq3+BYBMKGUfa2faTZhbx+LE+y1RgitFE= github.com/ydb-platform/ydb-go-sdk/v3 v3.39.0 h1:fGRZa9mGacXBG83BhgcP6i4BPVXHRQfxFoqqleEpHCU= github.com/ydb-platform/ydb-go-sdk/v3 v3.39.0/go.mod h1:hJqWSE2NZ2o2c9geHtRJee+xwiHgEfQX9koBZPLTfHY= +github.com/ydb-platform/ydb-go-sdk/v3 v3.40.0 h1:2ZpiP9RuGUi1qGNlleWTXS9ZeT/6WLjmrPQMNWJkQvw= +github.com/ydb-platform/ydb-go-sdk/v3 v3.40.0/go.mod h1:hJqWSE2NZ2o2c9geHtRJee+xwiHgEfQX9koBZPLTfHY= github.com/ydb-platform/ydb-go-yc v0.8.3/go.mod h1:zUolAFGzJ5XG8uwiseTLr9Lapm7L7hdVdZgLSuv9FXE= github.com/ydb-platform/ydb-go-yc v0.9.1 h1:rEySP+jdog0J+DAOYn5u3ggdciUuvwNNuQwVlaXm9Jw= github.com/ydb-platform/ydb-go-yc v0.9.1/go.mod h1:zUolAFGzJ5XG8uwiseTLr9Lapm7L7hdVdZgLSuv9FXE= diff --git a/topic/cdc-cache-bus-freeseats/database.go b/topic/cdc-cache-bus-freeseats/database.go index d650e5e..c4446c7 100644 --- a/topic/cdc-cache-bus-freeseats/database.go +++ b/topic/cdc-cache-bus-freeseats/database.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/sugar" "log" "os" "path" @@ -27,18 +28,22 @@ func createTableAndCDC(ctx context.Context, db ydb.Connection, consumersCount in } func createTables(ctx context.Context, db ydb.Connection) error { - err := db.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - err := s.DropTable(ctx, path.Join(db.Name(), "bus")) - if ydb.IsOperationErrorSchemeError(err) { - err = nil + if exists, err := sugar.IsTableExists(ctx, db.Scheme(), path.Join(db.Name(), "bus")); exists && err == nil { + err = db.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + err := s.DropTable(ctx, path.Join(db.Name(), "bus")) + if ydb.IsOperationErrorSchemeError(err) { + err = nil + } + return err + }) + if err != nil { + return fmt.Errorf("failed to drop table: %w", err) } - return err - }) - if err != nil { - return fmt.Errorf("failed to drop table: %w", err) + } else if err != nil { + fmt.Errorf("failed to check table existence: %w", err) } - _, err = db.Scripting().Execute(ctx, ` + _, err := db.Scripting().Execute(ctx, ` CREATE TABLE bus (id Utf8, freeSeats Int64, PRIMARY KEY(id)); ALTER TABLE