diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index c27fb6b3..0c2b7260 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -44,6 +44,11 @@ func (s BackupState) String() string { } } +func formatKeyWordName(name string) string { + return "`" + strings.TrimSpace(name) + "`" + +} + func ParseBackupState(state string) BackupState { switch state { case "PENDING": @@ -196,7 +201,7 @@ func (s *Spec) IsDatabaseEnableBinlog() (bool, error) { } var createDBString string - query := fmt.Sprintf("SHOW CREATE DATABASE %s", s.Database) + query := fmt.Sprintf("SHOW CREATE DATABASE %s", formatKeyWordName(s.Database)) rows, err := db.Query(query) if err != nil { return false, xerror.Wrap(err, xerror.Normal, query) @@ -234,7 +239,7 @@ func (s *Spec) IsTableEnableBinlog() (bool, error) { } var createTableString string - query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", s.Database, s.Table) + query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", formatKeyWordName(s.Database), formatKeyWordName(s.Table)) rows, err := db.Query(query) if err != nil { return false, xerror.Wrap(err, xerror.Normal, query) @@ -300,7 +305,7 @@ func (s *Spec) dropTable(table string) error { return err } - sql := fmt.Sprintf("DROP TABLE %s.%s", s.Database, table) + sql := fmt.Sprintf("DROP TABLE %s.%s", formatKeyWordName(s.Database), formatKeyWordName(table)) _, err = db.Exec(sql) if err != nil { return xerror.Wrapf(err, xerror.Normal, "drop table %s.%s failed, sql: %s", s.Database, table, sql) @@ -316,13 +321,13 @@ func (s *Spec) ClearDB() error { return err } - sql := fmt.Sprintf("DROP DATABASE %s", s.Database) + sql := fmt.Sprintf("DROP DATABASE %s", formatKeyWordName(s.Database)) _, err = db.Exec(sql) if err != nil { return xerror.Wrapf(err, xerror.Normal, "drop database %s failed", s.Database) } - if _, err = db.Exec("CREATE DATABASE " + s.Database); err != nil { + if _, err = db.Exec("CREATE DATABASE " + formatKeyWordName(s.Database)); err != nil { return xerror.Wrapf(err, xerror.Normal, "create database %s failed", s.Database) } return nil @@ -336,7 +341,7 @@ func (s *Spec) CreateDatabase() error { return nil } - if _, err = db.Exec("CREATE DATABASE IF NOT EXISTS " + s.Database); err != nil { + if _, err = db.Exec("CREATE DATABASE IF NOT EXISTS " + formatKeyWordName(s.Database)); err != nil { return xerror.Wrapf(err, xerror.Normal, "create database %s failed", s.Database) } return nil @@ -396,7 +401,7 @@ func (s *Spec) CheckTableExists() (bool, error) { return false, err } - sql := fmt.Sprintf("SHOW TABLES FROM %s LIKE '%s'", s.Database, s.Table) + sql := fmt.Sprintf("SHOW TABLES FROM %s LIKE '%s'", formatKeyWordName(s.Database), s.Table) rows, err := db.Query(sql) if err != nil { return false, xerror.Wrapf(err, xerror.Normal, "show tables failed, sql: %s", sql) @@ -436,12 +441,17 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) { // snapshot name format "ccrs_${table}_${timestamp}" // table refs = table snapshotName = fmt.Sprintf("ccrs_%s_%s_%d", s.Database, s.Table, time.Now().Unix()) - tableRefs = tables[0] + tableRefs = formatKeyWordName(tables[0]) } else { // snapshot name format "ccrs_${db}_${timestamp}" // table refs = tables.join(", ") snapshotName = fmt.Sprintf("ccrs_%s_%d", s.Database, time.Now().Unix()) - tableRefs = strings.Join(tables, ", ") + tableRefs = "`" + strings.Join(tables, "`,`") + "`" + } + + // means source is a empty db, table numer is 0 + if tableRefs == "``" { + return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table") } log.Infof("create snapshot %s.%s", s.Database, snapshotName) @@ -451,7 +461,7 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) { return "", err } - backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s ) PROPERTIES (\"type\" = \"full\")", s.Database, snapshotName, tableRefs) + backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s ) PROPERTIES (\"type\" = \"full\")", formatKeyWordName(s.Database), snapshotName, tableRefs) log.Debugf("backup snapshot sql: %s", backupSnapshotSql) _, err = db.Exec(backupSnapshotSql) if err != nil { @@ -479,7 +489,7 @@ func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, error) { return BackupStateUnknown, err } - sql := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName = \"%s\"", s.Database, snapshotName) + sql := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName = \"%s\"", formatKeyWordName(s.Database), snapshotName) log.Debugf("check backup state sql: %s", sql) rows, err := db.Query(sql) if err != nil { @@ -532,7 +542,7 @@ func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, error) { return RestoreStateUnknown, err } - query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label = \"%s\"", s.Database, snapshotName) + query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label = \"%s\"", formatKeyWordName(s.Database), snapshotName) log.Debugf("check restore state sql: %s", query) rows, err := db.Query(query) @@ -590,7 +600,7 @@ func (s *Spec) waitTransactionDone(txnId int64) error { // WHERE // [id=transaction_id] // [label = label_name]; - query := fmt.Sprintf("SHOW TRANSACTION FROM %s WHERE id = %d", s.Database, txnId) + query := fmt.Sprintf("SHOW TRANSACTION FROM %s WHERE id = %d", formatKeyWordName(s.Database), txnId) log.Debugf("wait transaction done sql: %s", query) rows, err := db.Query(query) diff --git a/regression-test/suites/db-sync/test_db_sync.groovy b/regression-test/suites/db-sync/test_db_sync.groovy index 34479e05..014be56c 100644 --- a/regression-test/suites/db-sync/test_db_sync.groovy +++ b/regression-test/suites/db-sync/test_db_sync.groovy @@ -237,10 +237,12 @@ suite("test_db_sync") { def tableUnique1 = "tbl_common_1_" + UUID.randomUUID().toString().replace("-", "") def tableAggregate1 = "tbl_aggregate_1_" + UUID.randomUUID().toString().replace("-", "") def tableDuplicate1 = "tbl_duplicate_1_" + UUID.randomUUID().toString().replace("-", "") + def keywordTableName = "roles" createUniqueTable(tableUnique1) createAggergateTable(tableAggregate1) createDuplicateTable(tableDuplicate1) + createUniqueTable(keywordTableName) for (int index = 0; index < insert_num; index++) { sql """ @@ -257,6 +259,11 @@ suite("test_db_sync") { INSERT INTO ${tableDuplicate1} VALUES (0, 99, '${date_num}') """ } + for (int index = 0; index < insert_num; index++) { + sql """ + INSERT INTO ${keywordTableName} VALUES (${test_num}, ${index}, '${date_num}') + """ + } assertTrue(checkShowTimesOf("SHOW CREATE TABLE TEST_${context.dbName}.${tableUnique1}", exist, 30, "target")) @@ -273,10 +280,16 @@ suite("test_db_sync") { assertTrue(checkSelectTimesOf("SELECT * FROM ${tableDuplicate1} WHERE test=0", insert_num, 30)) + assertTrue(checkShowTimesOf("SHOW CREATE TABLE TEST_${context.dbName}.${keywordTableName}", + exist, 30, "target")) + assertTrue(checkSelectTimesOf("SELECT * FROM ${keywordTableName} WHERE test=${test_num}", + insert_num, 30)) + logger.info("=== Test 3: drop table case ===") sql "DROP TABLE ${tableUnique1}" sql "DROP TABLE ${tableAggregate1}" sql "DROP TABLE ${tableDuplicate1}" + sql "DROP TABLE ${keywordTableName}" assertTrue(checkShowTimesOf("SHOW TABLES LIKE '${tableUnique1}'", notExist, 30, "target")) @@ -284,6 +297,8 @@ suite("test_db_sync") { notExist, 30, "target")) assertTrue(checkShowTimesOf("SHOW TABLES LIKE '${tableDuplicate1}'", notExist, 30, "target")) + assertTrue(checkShowTimesOf("SHOW TABLES LIKE '${keywordTableName}'", + notExist, 30, "target")) logger.info("=== Test 4: pause and resume ===") httpTest { diff --git a/regression-test/suites/table-sync/test_keyword_nema.groovy b/regression-test/suites/table-sync/test_keyword_nema.groovy new file mode 100644 index 00000000..b9d64f45 --- /dev/null +++ b/regression-test/suites/table-sync/test_keyword_nema.groovy @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_keyword_nema") { + + def tableName = "roles" + def syncerAddress = "127.0.0.1:9190" + def test_num = 0 + def insert_num = 5 + def sync_gap_time = 5000 + def opPartitonName = "less0" + String response + + def checkSelectTimesOf = { sqlString, rowSize, times -> Boolean + def tmpRes = target_sql "${sqlString}" + while (tmpRes.size() != rowSize) { + sleep(sync_gap_time) + if (--times > 0) { + tmpRes = target_sql "${sqlString}" + } else { + break + } + } + return tmpRes.size() == rowSize + } + + def checkShowTimesOf = { sqlString, myClosure, times, func = "sql" -> Boolean + Boolean ret = false + List> res + while (times > 0) { + try { + if (func == "sql") { + res = sql "${sqlString}" + } else { + res = target_sql "${sqlString}" + } + if (myClosure.call(res)) { + ret = true + } + } catch (Exception e) {} + + if (ret) { + break + } else if (--times > 0) { + sleep(sync_gap_time) + } + } + + return ret + } + + def checkRestoreFinishTimesOf = { checkTable, times -> Boolean + Boolean ret = false + while (times > 0) { + def sqlInfo = target_sql "SHOW RESTORE FROM TEST_${context.dbName}" + for (List row : sqlInfo) { + if ((row[10] as String).contains(checkTable)) { + ret = (row[4] as String) == "FINISHED" + } + } + + if (ret) { + break + } else if (--times > 0) { + sleep(sync_gap_time) + } + } + + return ret + } + + def exist = { res -> Boolean + return res.size() != 0 + } + def notExist = { res -> Boolean + return res.size() == 0 + } + + sql """ + CREATE TABLE `${tableName}` ( + role_id INT, + occupation VARCHAR(32), + camp VARCHAR(32), + register_time DATE + ) + UNIQUE KEY(role_id) + DISTRIBUTED BY HASH(role_id) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "binlog.enable" = "true" + ); + """ + // sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")""" + + sql """ + INSERT INTO `${tableName}` VALUES + (0, 'who am I', NULL, NULL), + (1, 'mage', 'alliance', '2018-12-03 16:11:28'), + (2, 'paladin', 'alliance', '2018-11-30 16:11:28'), + (3, 'rogue', 'horde', '2018-12-01 16:11:28'), + (4, 'priest', 'alliance', '2018-12-02 16:11:28'), + (5, 'shaman', 'horde', NULL), + (6, 'warrior', 'alliance', NULL), + (7, 'warlock', 'horde', '2018-12-04 16:11:28'), + (8, 'hunter', 'horde', NULL); + """ + + httpTest { + uri "/create_ccr" + endpoint syncerAddress + def bodyJson = get_ccr_body "${tableName}" + body "${bodyJson}" + op "post" + result response + } + + assertTrue(checkRestoreFinishTimesOf("${tableName}", 30)) + + + + logger.info("=== Test 1: Check keyword name table ===") + // def checkShowTimesOf = { sqlString, myClosure, times, func = "sql" -> Boolean + assertTrue(checkShowTimesOf(""" + SHOW CREATE TABLE `TEST_${context.dbName}`.`${tableName}` + """, + exist, 30, "target")) + +} \ No newline at end of file