Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func (s BackupState) String() string {
}
}

func formatKeyWordName(name string) string {
return "`" + strings.TrimSpace(name) + "`"

}

func ParseBackupState(state string) BackupState {
switch state {
case "PENDING":
Expand Down Expand Up @@ -196,7 +201,7 @@ func (s *Spec) IsDatabaseEnableBinlog() (bool, error) {
}

var createDBString string
query := fmt.Sprintf("SHOW CREATE DATABASE %s", s.Database)
query := fmt.Sprintf("SHOW CREATE DATABASE %s", formatKeyWordName(s.Database))
rows, err := db.Query(query)
if err != nil {
return false, xerror.Wrap(err, xerror.Normal, query)
Expand Down Expand Up @@ -234,7 +239,7 @@ func (s *Spec) IsTableEnableBinlog() (bool, error) {
}

var createTableString string
query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", s.Database, s.Table)
query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", formatKeyWordName(s.Database), formatKeyWordName(s.Table))
rows, err := db.Query(query)
if err != nil {
return false, xerror.Wrap(err, xerror.Normal, query)
Expand Down Expand Up @@ -300,7 +305,7 @@ func (s *Spec) dropTable(table string) error {
return err
}

sql := fmt.Sprintf("DROP TABLE %s.%s", s.Database, table)
sql := fmt.Sprintf("DROP TABLE %s.%s", formatKeyWordName(s.Database), formatKeyWordName(table))
_, err = db.Exec(sql)
if err != nil {
return xerror.Wrapf(err, xerror.Normal, "drop table %s.%s failed, sql: %s", s.Database, table, sql)
Expand All @@ -316,13 +321,13 @@ func (s *Spec) ClearDB() error {
return err
}

sql := fmt.Sprintf("DROP DATABASE %s", s.Database)
sql := fmt.Sprintf("DROP DATABASE %s", formatKeyWordName(s.Database))
_, err = db.Exec(sql)
if err != nil {
return xerror.Wrapf(err, xerror.Normal, "drop database %s failed", s.Database)
}

if _, err = db.Exec("CREATE DATABASE " + s.Database); err != nil {
if _, err = db.Exec("CREATE DATABASE " + formatKeyWordName(s.Database)); err != nil {
return xerror.Wrapf(err, xerror.Normal, "create database %s failed", s.Database)
}
return nil
Expand All @@ -336,7 +341,7 @@ func (s *Spec) CreateDatabase() error {
return nil
}

if _, err = db.Exec("CREATE DATABASE IF NOT EXISTS " + s.Database); err != nil {
if _, err = db.Exec("CREATE DATABASE IF NOT EXISTS " + formatKeyWordName(s.Database)); err != nil {
return xerror.Wrapf(err, xerror.Normal, "create database %s failed", s.Database)
}
return nil
Expand Down Expand Up @@ -396,7 +401,7 @@ func (s *Spec) CheckTableExists() (bool, error) {
return false, err
}

sql := fmt.Sprintf("SHOW TABLES FROM %s LIKE '%s'", s.Database, s.Table)
sql := fmt.Sprintf("SHOW TABLES FROM %s LIKE '%s'", formatKeyWordName(s.Database), s.Table)
rows, err := db.Query(sql)
if err != nil {
return false, xerror.Wrapf(err, xerror.Normal, "show tables failed, sql: %s", sql)
Expand Down Expand Up @@ -436,12 +441,17 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) {
// snapshot name format "ccrs_${table}_${timestamp}"
// table refs = table
snapshotName = fmt.Sprintf("ccrs_%s_%s_%d", s.Database, s.Table, time.Now().Unix())
tableRefs = tables[0]
tableRefs = formatKeyWordName(tables[0])
} else {
// snapshot name format "ccrs_${db}_${timestamp}"
// table refs = tables.join(", ")
snapshotName = fmt.Sprintf("ccrs_%s_%d", s.Database, time.Now().Unix())
tableRefs = strings.Join(tables, ", ")
tableRefs = "`" + strings.Join(tables, "`,`") + "`"
}

// means source is a empty db, table numer is 0
if tableRefs == "``" {
return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
}

log.Infof("create snapshot %s.%s", s.Database, snapshotName)
Expand All @@ -451,7 +461,7 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) {
return "", err
}

backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s ) PROPERTIES (\"type\" = \"full\")", s.Database, snapshotName, tableRefs)
backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s ) PROPERTIES (\"type\" = \"full\")", formatKeyWordName(s.Database), snapshotName, tableRefs)
log.Debugf("backup snapshot sql: %s", backupSnapshotSql)
_, err = db.Exec(backupSnapshotSql)
if err != nil {
Expand Down Expand Up @@ -479,7 +489,7 @@ func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, error) {
return BackupStateUnknown, err
}

sql := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName = \"%s\"", s.Database, snapshotName)
sql := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName = \"%s\"", formatKeyWordName(s.Database), snapshotName)
log.Debugf("check backup state sql: %s", sql)
rows, err := db.Query(sql)
if err != nil {
Expand Down Expand Up @@ -532,7 +542,7 @@ func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, error) {
return RestoreStateUnknown, err
}

query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label = \"%s\"", s.Database, snapshotName)
query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label = \"%s\"", formatKeyWordName(s.Database), snapshotName)

log.Debugf("check restore state sql: %s", query)
rows, err := db.Query(query)
Expand Down Expand Up @@ -590,7 +600,7 @@ func (s *Spec) waitTransactionDone(txnId int64) error {
// WHERE
// [id=transaction_id]
// [label = label_name];
query := fmt.Sprintf("SHOW TRANSACTION FROM %s WHERE id = %d", s.Database, txnId)
query := fmt.Sprintf("SHOW TRANSACTION FROM %s WHERE id = %d", formatKeyWordName(s.Database), txnId)

log.Debugf("wait transaction done sql: %s", query)
rows, err := db.Query(query)
Expand Down
15 changes: 15 additions & 0 deletions regression-test/suites/db-sync/test_db_sync.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,12 @@ suite("test_db_sync") {
def tableUnique1 = "tbl_common_1_" + UUID.randomUUID().toString().replace("-", "")
def tableAggregate1 = "tbl_aggregate_1_" + UUID.randomUUID().toString().replace("-", "")
def tableDuplicate1 = "tbl_duplicate_1_" + UUID.randomUUID().toString().replace("-", "")
def keywordTableName = "roles"

createUniqueTable(tableUnique1)
createAggergateTable(tableAggregate1)
createDuplicateTable(tableDuplicate1)
createUniqueTable(keywordTableName)

for (int index = 0; index < insert_num; index++) {
sql """
Expand All @@ -257,6 +259,11 @@ suite("test_db_sync") {
INSERT INTO ${tableDuplicate1} VALUES (0, 99, '${date_num}')
"""
}
for (int index = 0; index < insert_num; index++) {
sql """
INSERT INTO ${keywordTableName} VALUES (${test_num}, ${index}, '${date_num}')
"""
}

assertTrue(checkShowTimesOf("SHOW CREATE TABLE TEST_${context.dbName}.${tableUnique1}",
exist, 30, "target"))
Expand All @@ -273,17 +280,25 @@ suite("test_db_sync") {
assertTrue(checkSelectTimesOf("SELECT * FROM ${tableDuplicate1} WHERE test=0",
insert_num, 30))

assertTrue(checkShowTimesOf("SHOW CREATE TABLE TEST_${context.dbName}.${keywordTableName}",
exist, 30, "target"))
assertTrue(checkSelectTimesOf("SELECT * FROM ${keywordTableName} WHERE test=${test_num}",
insert_num, 30))

logger.info("=== Test 3: drop table case ===")
sql "DROP TABLE ${tableUnique1}"
sql "DROP TABLE ${tableAggregate1}"
sql "DROP TABLE ${tableDuplicate1}"
sql "DROP TABLE ${keywordTableName}"

assertTrue(checkShowTimesOf("SHOW TABLES LIKE '${tableUnique1}'",
notExist, 30, "target"))
assertTrue(checkShowTimesOf("SHOW TABLES LIKE '${tableAggregate1}'",
notExist, 30, "target"))
assertTrue(checkShowTimesOf("SHOW TABLES LIKE '${tableDuplicate1}'",
notExist, 30, "target"))
assertTrue(checkShowTimesOf("SHOW TABLES LIKE '${keywordTableName}'",
notExist, 30, "target"))

logger.info("=== Test 4: pause and resume ===")
httpTest {
Expand Down
142 changes: 142 additions & 0 deletions regression-test/suites/table-sync/test_keyword_nema.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_keyword_nema") {

def tableName = "roles"
def syncerAddress = "127.0.0.1:9190"
def test_num = 0
def insert_num = 5
def sync_gap_time = 5000
def opPartitonName = "less0"
String response

def checkSelectTimesOf = { sqlString, rowSize, times -> Boolean
def tmpRes = target_sql "${sqlString}"
while (tmpRes.size() != rowSize) {
sleep(sync_gap_time)
if (--times > 0) {
tmpRes = target_sql "${sqlString}"
} else {
break
}
}
return tmpRes.size() == rowSize
}

def checkShowTimesOf = { sqlString, myClosure, times, func = "sql" -> Boolean
Boolean ret = false
List<List<Object>> res
while (times > 0) {
try {
if (func == "sql") {
res = sql "${sqlString}"
} else {
res = target_sql "${sqlString}"
}
if (myClosure.call(res)) {
ret = true
}
} catch (Exception e) {}

if (ret) {
break
} else if (--times > 0) {
sleep(sync_gap_time)
}
}

return ret
}

def checkRestoreFinishTimesOf = { checkTable, times -> Boolean
Boolean ret = false
while (times > 0) {
def sqlInfo = target_sql "SHOW RESTORE FROM TEST_${context.dbName}"
for (List<Object> row : sqlInfo) {
if ((row[10] as String).contains(checkTable)) {
ret = (row[4] as String) == "FINISHED"
}
}

if (ret) {
break
} else if (--times > 0) {
sleep(sync_gap_time)
}
}

return ret
}

def exist = { res -> Boolean
return res.size() != 0
}
def notExist = { res -> Boolean
return res.size() == 0
}

sql """
CREATE TABLE `${tableName}` (
role_id INT,
occupation VARCHAR(32),
camp VARCHAR(32),
register_time DATE
)
UNIQUE KEY(role_id)
DISTRIBUTED BY HASH(role_id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"binlog.enable" = "true"
);
"""
// sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""

sql """
INSERT INTO `${tableName}` VALUES
(0, 'who am I', NULL, NULL),
(1, 'mage', 'alliance', '2018-12-03 16:11:28'),
(2, 'paladin', 'alliance', '2018-11-30 16:11:28'),
(3, 'rogue', 'horde', '2018-12-01 16:11:28'),
(4, 'priest', 'alliance', '2018-12-02 16:11:28'),
(5, 'shaman', 'horde', NULL),
(6, 'warrior', 'alliance', NULL),
(7, 'warlock', 'horde', '2018-12-04 16:11:28'),
(8, 'hunter', 'horde', NULL);
"""

httpTest {
uri "/create_ccr"
endpoint syncerAddress
def bodyJson = get_ccr_body "${tableName}"
body "${bodyJson}"
op "post"
result response
}

assertTrue(checkRestoreFinishTimesOf("${tableName}", 30))



logger.info("=== Test 1: Check keyword name table ===")
// def checkShowTimesOf = { sqlString, myClosure, times, func = "sql" -> Boolean
assertTrue(checkShowTimesOf("""
SHOW CREATE TABLE `TEST_${context.dbName}`.`${tableName}`
""",
exist, 30, "target"))

}