Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 72 additions & 13 deletions doc/operations.md
Original file line number Diff line number Diff line change
@@ -1,63 +1,122 @@
# Syncer操作列表

### 请求的通用模板

```bash
curl -X POST -H "Content-Type: application/json" -d {json_body} http://ccr_syncer_host:ccr_syncer_port/operator
```
json_body: 以json的格式发送操作所需信息
operator:对应Syncer的不同操作
- json_body: 以json的格式发送操作所需信息
- operator:对应Syncer的不同操作

### operators
- create_ccr
创建CCR任务,详见[README](../README.md)
- get_lag

- `create_ccr`
创建CCR任务,详见[README](../README.md)。
- `get_lag`
查看同步进度
```bash
curl -X POST -L --post303 -H "Content-Type: application/json" -d '{
"name": "job_name"
}' http://ccr_syncer_host:ccr_syncer_port/get_lag
```
其中job_name是create_ccr时创建的name
- pause
- `pause`
暂停同步任务
```bash
curl -X POST -L --post303 -H "Content-Type: application/json" -d '{
"name": "job_name"
}' http://ccr_syncer_host:ccr_syncer_port/pause
```
- resume
- `resume`
恢复同步任务
```bash
curl -X POST -L --post303 -H "Content-Type: application/json" -d '{
"name": "job_name"
}' http://ccr_syncer_host:ccr_syncer_port/resume
```
- delete
- `delete`
删除同步任务
```bash
curl -X POST -L --post303 -H "Content-Type: application/json" -d '{
"name": "job_name"
}' http://ccr_syncer_host:ccr_syncer_port/delete
```
- list_jobs
- `list_jobs`
列出所有job名称
```bash
curl -X POST -L --post303 -H "Content-Type: application/json" -d '{}' http://ccr_syncer_host:ccr_syncer_port/list_jobs
```
- job_detail
- `job_detail`
展示job的详细信息
```bash
curl -X POST -L --post303 -H "Content-Type: application/json" -d '{
"name": "job_name"
}' http://ccr_syncer_host:ccr_syncer_port/job_detail
```
- job_progress
- `job_progress`
展示job的详细进度信息
```bash
curl -X POST -L --post303 -H "Content-Type: application/json" -d '{
"name": "job_name"
}' http://ccr_syncer_host:ccr_syncer_port/job_progress
```
- metrics
- `metrics`
获取golang以及ccr job的metrics信息
```bash
curl -L --post303 http://ccr_syncer_host:ccr_syncer_port/metrics
curl -L --post303 http://ccr_syncer_host:ccr_syncer_port/metrics
```
- `update_host_mapping`
更新上游 BE 集群 private ip 到 public ip 的映射;如果参数中的 public ip 为空,则删除该 private 的映射
```bash
curl -X POST -L --post303 -H "Content-Type: application/json" -d '{
"name": "job_name",
"src_host_mapping": {
"172.168.1.1": "10.0.10.1",
"172.168.1.2": "10.0.10.2",
"172.168.1.3": "10.0.10.3",
"172.168.1.5": ""
},
"dest_host_mapping": {
...
}
}' http://ccr_syncer_host:ccr_syncer_port/add_host_mapping
```
更新上游 172.168.1.1-3 的映射,同时删除 172.168.1.5 的映射。
- `src_host_mapping`: 上游映射
- `dest_host_mapping`: 下游映射

### 一些特殊场景

#### 上下游通过公网 IP 进行同步

ccr syncer 支持将上下游部署到不同的网络环境中,并通过公网 IP 进行数据同步。

具体方案:每个 job 会记录下上游 private IP 到 public IP 的映射关系(由用户提供),并在下游载入 binlog 前,将上游集群 BE 的 private 转换成对应的 public IP。

使用方式:创建 ccr job 时增加一个参数:
```bash
curl -X POST -H "Content-Type: application/json" -d '{
"name": "ccr_test",
"src": {
"host_mapping": {
"172.168.1.1": "10.0.10.1",
"172.168.1.2": "10.0.10.2",
"172.168.1.3": "10.0.10.3"
},
...
},
"dest": {
"host_mapping": {
"172.168.2.3": "10.0.10.9",
"172.168.2.4": ""
},
...
},
}' http://127.0.0.1:9190/create_ccr
```

`host_mapping` 用法与 `/update_host_mapping` 接口一致。

相关操作:
- 修改/删除/增加新映射,使用 `/update_host_mapping` 接口
- 查看 job 的所有映射,使用 `/job_detail` 接口
3 changes: 3 additions & 0 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ type Spec struct {
Table string `json:"table"`
TableId int64 `json:"table_id"`

// The mapping of host private and public ip
HostMapping map[string]string `json:"host_mapping,omitempty"`

observers []utils.Observer[SpecEvent]
}

Expand Down
82 changes: 54 additions & 28 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,55 +140,43 @@ type Job struct {
lock sync.Mutex `json:"-"`
}

type jobContext struct {
type JobContext struct {
context.Context
src base.Spec
dest base.Spec
db storage.DB
skipError bool
allowTableExists bool
factory *Factory
}

func NewJobContext(src, dest base.Spec, skipError bool, allowTableExists bool, db storage.DB, factory *Factory) *jobContext {
return &jobContext{
Context: context.Background(),
src: src,
dest: dest,
skipError: skipError,
allowTableExists: allowTableExists,
db: db,
factory: factory,
}
Src base.Spec
Dest base.Spec
Db storage.DB
SkipError bool
AllowTableExists bool
Factory *Factory
}

// new job
func NewJobFromService(name string, ctx context.Context) (*Job, error) {
jobContext, ok := ctx.(*jobContext)
jobContext, ok := ctx.(*JobContext)
if !ok {
return nil, xerror.Errorf(xerror.Normal, "invalid context type: %T", ctx)
}

factory := jobContext.factory
src := jobContext.src
dest := jobContext.dest
factory := jobContext.Factory
src := jobContext.Src
dest := jobContext.Dest
job := &Job{
Name: name,
Src: src,
ISrc: factory.NewSpecer(&src),
srcMeta: factory.NewMeta(&jobContext.src),
srcMeta: factory.NewMeta(&jobContext.Src),
Dest: dest,
IDest: factory.NewSpecer(&dest),
destMeta: factory.NewMeta(&jobContext.dest),
SkipError: jobContext.skipError,
destMeta: factory.NewMeta(&jobContext.Dest),
SkipError: jobContext.SkipError,
State: JobRunning,

allowTableExists: jobContext.allowTableExists,
allowTableExists: jobContext.AllowTableExists,
factory: factory,
forceFullsync: false,

progress: nil,
db: jobContext.db,
db: jobContext.Db,
stop: make(chan struct{}),

concurrencyManager: rpc.NewConcurrencyManager(),
Expand Down Expand Up @@ -3384,6 +3372,44 @@ func (j *Job) Status() *JobStatus {
}
}

func (j *Job) UpdateHostMapping(srcHostMaps, destHostMaps map[string]string) error {
j.lock.Lock()
defer j.lock.Unlock()

oldSrcHostMapping := j.Src.HostMapping
if j.Src.HostMapping == nil {
j.Src.HostMapping = make(map[string]string)
}
for private, public := range srcHostMaps {
if public == "" {
delete(j.Src.HostMapping, private)
} else {
j.Src.HostMapping[private] = public
}
}

oldDestHostMapping := j.Dest.HostMapping
if j.Dest.HostMapping == nil {
j.Dest.HostMapping = make(map[string]string)
}
for private, public := range destHostMaps {
if public == "" {
delete(j.Dest.HostMapping, private)
} else {
j.Dest.HostMapping[private] = public
}
}

if err := j.persistJob(); err != nil {
j.Src.HostMapping = oldSrcHostMapping
j.Dest.HostMapping = oldDestHostMapping
return err
}

log.Debugf("update job %s src host mapping %+v, dest host mapping: %+v", j.Name, srcHostMaps, destHostMaps)
return nil
}

func isTxnCommitted(status *tstatus.TStatus) bool {
return isStatusContainsAny(status, "is already COMMITTED")
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/ccr/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,14 @@ func (jm *JobManager) UpdateJobSkipError(jobName string, skipError bool) error {
return xerror.Errorf(xerror.Normal, "job not exist: %s", jobName)
}
}

func (jm *JobManager) UpdateHostMapping(jobName string, srcHostMapping, destHostMapping map[string]string) error {
jm.lock.Lock()
defer jm.lock.Unlock()

if job, ok := jm.jobs[jobName]; ok {
return job.UpdateHostMapping(srcHostMapping, destHostMapping)
} else {
return xerror.Errorf(xerror.Normal, "job not exist: %s", jobName)
}
}
24 changes: 23 additions & 1 deletion pkg/ccr/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,14 +578,36 @@ func (m *Meta) GetFrontends() ([]*base.Frontend, error) {
return nil, xerror.Wrap(err, xerror.Normal, query)
}

if len(m.HostMapping) != 0 {
for _, frontend := range frontends {
if host, ok := m.HostMapping[frontend.Host]; ok {
frontend.Host = host
} else {
return nil, xerror.Errorf(xerror.Normal,
"the public ip of host %s is not found, consider adding it via HTTP API /add_host_mapping", frontend.Host)
}
}
}

return frontends, nil
}

func (m *Meta) GetBackends() ([]*base.Backend, error) {
if len(m.Backends) > 0 {
backends := make([]*base.Backend, 0, len(m.Backends))
for _, backend := range m.Backends {
backends = append(backends, backend)
backend := *backend // copy
backends = append(backends, &backend)
}
if len(m.HostMapping) != 0 {
for _, backend := range backends {
if host, ok := m.HostMapping[backend.Host]; ok {
backend.Host = host
} else {
return nil, xerror.Errorf(xerror.Normal,
"the public ip of host %s is not found, consider adding it via HTTP API /add_host_mapping", backend.Host)
}
}
}
return backends, nil
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/ccr/thrift_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,21 @@ func (tm *ThriftMeta) GetIndexNameMap(tableId, partitionId int64) (map[string]*I
}

func (tm *ThriftMeta) GetBackendMap() (map[int64]*base.Backend, error) {
return tm.meta.Backends, nil
if tm.meta.HostMapping == nil {
return tm.meta.Backends, nil
}

backends := make(map[int64]*base.Backend)
for id, backend := range tm.meta.Backends {
if host, ok := tm.meta.HostMapping[backend.Host]; ok {
backend.Host = host
} else {
return nil, xerror.Errorf(xerror.Normal,
"the public ip of host %s is not found, consider adding it via HTTP API /update_host_mapping", backend.Host)
}
backends[id] = backend
}
return backends, nil
}

// Whether the target partition are dropped
Expand Down
17 changes: 16 additions & 1 deletion pkg/rpc/fe.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,25 @@ func (r *retryWithMasterRedirectAndCachedClientsRpc) call0(masterClient IFeRpc)
// switch to master
masterAddr := resp.GetMasterAddress()
err = xerror.Errorf(xerror.FE, "addr [%s] is not master", masterAddr)

// convert private ip to public ip, if need
hostname := masterAddr.Hostname
if r.rpc.spec.HostMapping != nil {
if host, ok := r.rpc.spec.HostMapping[hostname]; ok {
hostname = host
} else {
return &call0Result{
canUseNextAddr: true,
err: xerror.Errorf(xerror.Normal,
"the public ip of %s is not found, consider adding it via HTTP API /update_host_mapping", hostname),
}
}
}

return &call0Result{
canUseNextAddr: true,
resp: resp,
masterAddr: fmt.Sprintf("%s:%d", masterAddr.Hostname, masterAddr.Port),
masterAddr: fmt.Sprintf("%s:%d", hostname, masterAddr.Port),
err: err, // not nil
}
}
Expand Down
Loading
Loading