From 5324fab65838700640465c930641303100c95197 Mon Sep 17 00:00:00 2001 From: w41ter Date: Tue, 3 Dec 2024 10:52:50 +0800 Subject: [PATCH] Support private IP and public IP mapping --- doc/operations.md | 85 +++++++++++++++++++++++----- pkg/ccr/base/spec.go | 3 + pkg/ccr/job.go | 82 ++++++++++++++++++--------- pkg/ccr/job_manager.go | 11 ++++ pkg/ccr/meta.go | 24 +++++++- pkg/ccr/thrift_meta.go | 16 +++++- pkg/rpc/fe.go | 17 +++++- pkg/service/http_service.go | 67 +++++++++++++++++++--- regression-test/common/helper.groovy | 14 ++++- 9 files changed, 266 insertions(+), 53 deletions(-) diff --git a/doc/operations.md b/doc/operations.md index 41695ee9..8b89c0d3 100644 --- a/doc/operations.md +++ b/doc/operations.md @@ -1,14 +1,18 @@ # Syncer操作列表 + ### 请求的通用模板 + ```bash curl -X POST -H "Content-Type: application/json" -d {json_body} http://ccr_syncer_host:ccr_syncer_port/operator ``` -json_body: 以json的格式发送操作所需信息 -operator:对应Syncer的不同操作 +- json_body: 以json的格式发送操作所需信息 +- operator:对应Syncer的不同操作 + ### operators -- create_ccr - 创建CCR任务,详见[README](../README.md) -- get_lag + +- `create_ccr` + 创建CCR任务,详见[README](../README.md)。 +- `get_lag` 查看同步进度 ```bash curl -X POST -L --post303 -H "Content-Type: application/json" -d '{ @@ -16,48 +20,103 @@ operator:对应Syncer的不同操作 }' http://ccr_syncer_host:ccr_syncer_port/get_lag ``` 其中job_name是create_ccr时创建的name -- pause +- `pause` 暂停同步任务 ```bash curl -X POST -L --post303 -H "Content-Type: application/json" -d '{ "name": "job_name" }' http://ccr_syncer_host:ccr_syncer_port/pause ``` -- resume +- `resume` 恢复同步任务 ```bash curl -X POST -L --post303 -H "Content-Type: application/json" -d '{ "name": "job_name" }' http://ccr_syncer_host:ccr_syncer_port/resume ``` -- delete +- `delete` 删除同步任务 ```bash curl -X POST -L --post303 -H "Content-Type: application/json" -d '{ "name": "job_name" }' http://ccr_syncer_host:ccr_syncer_port/delete ``` -- list_jobs +- `list_jobs` 列出所有job名称 ```bash curl -X POST -L --post303 -H "Content-Type: application/json" -d '{}' http://ccr_syncer_host:ccr_syncer_port/list_jobs ``` -- job_detail +- `job_detail` 展示job的详细信息 ```bash curl -X POST -L --post303 -H "Content-Type: application/json" -d '{ "name": "job_name" }' http://ccr_syncer_host:ccr_syncer_port/job_detail ``` -- job_progress +- `job_progress` 展示job的详细进度信息 ```bash curl -X POST -L --post303 -H "Content-Type: application/json" -d '{ "name": "job_name" }' http://ccr_syncer_host:ccr_syncer_port/job_progress ``` -- metrics +- `metrics` 获取golang以及ccr job的metrics信息 ```bash - curl -L --post303 http://ccr_syncer_host:ccr_syncer_port/metrics + curl -L --post303 http://ccr_syncer_host:ccr_syncer_port/metrics ``` +- `update_host_mapping` + 更新上游 BE 集群 private ip 到 public ip 的映射;如果参数中的 public ip 为空,则删除该 private 的映射 + ```bash + curl -X POST -L --post303 -H "Content-Type: application/json" -d '{ + "name": "job_name", + "src_host_mapping": { + "172.168.1.1": "10.0.10.1", + "172.168.1.2": "10.0.10.2", + "172.168.1.3": "10.0.10.3", + "172.168.1.5": "" + }, + "dest_host_mapping": { + ... + } + }' http://ccr_syncer_host:ccr_syncer_port/add_host_mapping + ``` + 更新上游 172.168.1.1-3 的映射,同时删除 172.168.1.5 的映射。 + - `src_host_mapping`: 上游映射 + - `dest_host_mapping`: 下游映射 + +### 一些特殊场景 + +#### 上下游通过公网 IP 进行同步 + +ccr syncer 支持将上下游部署到不同的网络环境中,并通过公网 IP 进行数据同步。 + +具体方案:每个 job 会记录下上游 private IP 到 public IP 的映射关系(由用户提供),并在下游载入 binlog 前,将上游集群 BE 的 private 转换成对应的 public IP。 + +使用方式:创建 ccr job 时增加一个参数: +```bash +curl -X POST -H "Content-Type: application/json" -d '{ + "name": "ccr_test", + "src": { + "host_mapping": { + "172.168.1.1": "10.0.10.1", + "172.168.1.2": "10.0.10.2", + "172.168.1.3": "10.0.10.3" + }, + ... + }, + "dest": { + "host_mapping": { + "172.168.2.3": "10.0.10.9", + "172.168.2.4": "" + }, + ... + }, +}' http://127.0.0.1:9190/create_ccr +``` + +`host_mapping` 用法与 `/update_host_mapping` 接口一致。 + +相关操作: +- 修改/删除/增加新映射,使用 `/update_host_mapping` 接口 +- 查看 job 的所有映射,使用 `/job_detail` 接口 diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index f377fe4f..f6510257 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -220,6 +220,9 @@ type Spec struct { Table string `json:"table"` TableId int64 `json:"table_id"` + // The mapping of host private and public ip + HostMapping map[string]string `json:"host_mapping,omitempty"` + observers []utils.Observer[SpecEvent] } diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index c95879a4..3dfd7b40 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -140,55 +140,43 @@ type Job struct { lock sync.Mutex `json:"-"` } -type jobContext struct { +type JobContext struct { context.Context - src base.Spec - dest base.Spec - db storage.DB - skipError bool - allowTableExists bool - factory *Factory -} - -func NewJobContext(src, dest base.Spec, skipError bool, allowTableExists bool, db storage.DB, factory *Factory) *jobContext { - return &jobContext{ - Context: context.Background(), - src: src, - dest: dest, - skipError: skipError, - allowTableExists: allowTableExists, - db: db, - factory: factory, - } + Src base.Spec + Dest base.Spec + Db storage.DB + SkipError bool + AllowTableExists bool + Factory *Factory } // new job func NewJobFromService(name string, ctx context.Context) (*Job, error) { - jobContext, ok := ctx.(*jobContext) + jobContext, ok := ctx.(*JobContext) if !ok { return nil, xerror.Errorf(xerror.Normal, "invalid context type: %T", ctx) } - factory := jobContext.factory - src := jobContext.src - dest := jobContext.dest + factory := jobContext.Factory + src := jobContext.Src + dest := jobContext.Dest job := &Job{ Name: name, Src: src, ISrc: factory.NewSpecer(&src), - srcMeta: factory.NewMeta(&jobContext.src), + srcMeta: factory.NewMeta(&jobContext.Src), Dest: dest, IDest: factory.NewSpecer(&dest), - destMeta: factory.NewMeta(&jobContext.dest), - SkipError: jobContext.skipError, + destMeta: factory.NewMeta(&jobContext.Dest), + SkipError: jobContext.SkipError, State: JobRunning, - allowTableExists: jobContext.allowTableExists, + allowTableExists: jobContext.AllowTableExists, factory: factory, forceFullsync: false, progress: nil, - db: jobContext.db, + db: jobContext.Db, stop: make(chan struct{}), concurrencyManager: rpc.NewConcurrencyManager(), @@ -3384,6 +3372,44 @@ func (j *Job) Status() *JobStatus { } } +func (j *Job) UpdateHostMapping(srcHostMaps, destHostMaps map[string]string) error { + j.lock.Lock() + defer j.lock.Unlock() + + oldSrcHostMapping := j.Src.HostMapping + if j.Src.HostMapping == nil { + j.Src.HostMapping = make(map[string]string) + } + for private, public := range srcHostMaps { + if public == "" { + delete(j.Src.HostMapping, private) + } else { + j.Src.HostMapping[private] = public + } + } + + oldDestHostMapping := j.Dest.HostMapping + if j.Dest.HostMapping == nil { + j.Dest.HostMapping = make(map[string]string) + } + for private, public := range destHostMaps { + if public == "" { + delete(j.Dest.HostMapping, private) + } else { + j.Dest.HostMapping[private] = public + } + } + + if err := j.persistJob(); err != nil { + j.Src.HostMapping = oldSrcHostMapping + j.Dest.HostMapping = oldDestHostMapping + return err + } + + log.Debugf("update job %s src host mapping %+v, dest host mapping: %+v", j.Name, srcHostMaps, destHostMaps) + return nil +} + func isTxnCommitted(status *tstatus.TStatus) bool { return isStatusContainsAny(status, "is already COMMITTED") } diff --git a/pkg/ccr/job_manager.go b/pkg/ccr/job_manager.go index b0ae94d7..fa6e92a6 100644 --- a/pkg/ccr/job_manager.go +++ b/pkg/ccr/job_manager.go @@ -255,3 +255,14 @@ func (jm *JobManager) UpdateJobSkipError(jobName string, skipError bool) error { return xerror.Errorf(xerror.Normal, "job not exist: %s", jobName) } } + +func (jm *JobManager) UpdateHostMapping(jobName string, srcHostMapping, destHostMapping map[string]string) error { + jm.lock.Lock() + defer jm.lock.Unlock() + + if job, ok := jm.jobs[jobName]; ok { + return job.UpdateHostMapping(srcHostMapping, destHostMapping) + } else { + return xerror.Errorf(xerror.Normal, "job not exist: %s", jobName) + } +} diff --git a/pkg/ccr/meta.go b/pkg/ccr/meta.go index b4cbf36a..96b950f9 100644 --- a/pkg/ccr/meta.go +++ b/pkg/ccr/meta.go @@ -578,6 +578,17 @@ func (m *Meta) GetFrontends() ([]*base.Frontend, error) { return nil, xerror.Wrap(err, xerror.Normal, query) } + if len(m.HostMapping) != 0 { + for _, frontend := range frontends { + if host, ok := m.HostMapping[frontend.Host]; ok { + frontend.Host = host + } else { + return nil, xerror.Errorf(xerror.Normal, + "the public ip of host %s is not found, consider adding it via HTTP API /add_host_mapping", frontend.Host) + } + } + } + return frontends, nil } @@ -585,7 +596,18 @@ func (m *Meta) GetBackends() ([]*base.Backend, error) { if len(m.Backends) > 0 { backends := make([]*base.Backend, 0, len(m.Backends)) for _, backend := range m.Backends { - backends = append(backends, backend) + backend := *backend // copy + backends = append(backends, &backend) + } + if len(m.HostMapping) != 0 { + for _, backend := range backends { + if host, ok := m.HostMapping[backend.Host]; ok { + backend.Host = host + } else { + return nil, xerror.Errorf(xerror.Normal, + "the public ip of host %s is not found, consider adding it via HTTP API /add_host_mapping", backend.Host) + } + } } return backends, nil } diff --git a/pkg/ccr/thrift_meta.go b/pkg/ccr/thrift_meta.go index 10bc2206..ed64571d 100644 --- a/pkg/ccr/thrift_meta.go +++ b/pkg/ccr/thrift_meta.go @@ -249,7 +249,21 @@ func (tm *ThriftMeta) GetIndexNameMap(tableId, partitionId int64) (map[string]*I } func (tm *ThriftMeta) GetBackendMap() (map[int64]*base.Backend, error) { - return tm.meta.Backends, nil + if tm.meta.HostMapping == nil { + return tm.meta.Backends, nil + } + + backends := make(map[int64]*base.Backend) + for id, backend := range tm.meta.Backends { + if host, ok := tm.meta.HostMapping[backend.Host]; ok { + backend.Host = host + } else { + return nil, xerror.Errorf(xerror.Normal, + "the public ip of host %s is not found, consider adding it via HTTP API /update_host_mapping", backend.Host) + } + backends[id] = backend + } + return backends, nil } // Whether the target partition are dropped diff --git a/pkg/rpc/fe.go b/pkg/rpc/fe.go index c826ab23..01217118 100644 --- a/pkg/rpc/fe.go +++ b/pkg/rpc/fe.go @@ -263,10 +263,25 @@ func (r *retryWithMasterRedirectAndCachedClientsRpc) call0(masterClient IFeRpc) // switch to master masterAddr := resp.GetMasterAddress() err = xerror.Errorf(xerror.FE, "addr [%s] is not master", masterAddr) + + // convert private ip to public ip, if need + hostname := masterAddr.Hostname + if r.rpc.spec.HostMapping != nil { + if host, ok := r.rpc.spec.HostMapping[hostname]; ok { + hostname = host + } else { + return &call0Result{ + canUseNextAddr: true, + err: xerror.Errorf(xerror.Normal, + "the public ip of %s is not found, consider adding it via HTTP API /update_host_mapping", hostname), + } + } + } + return &call0Result{ canUseNextAddr: true, resp: resp, - masterAddr: fmt.Sprintf("%s:%d", masterAddr.Hostname, masterAddr.Port), + masterAddr: fmt.Sprintf("%s:%d", hostname, masterAddr.Port), err: err, // not nil } } diff --git a/pkg/service/http_service.go b/pkg/service/http_service.go index 33f73dec..36bd3f63 100644 --- a/pkg/service/http_service.go +++ b/pkg/service/http_service.go @@ -97,12 +97,12 @@ func (s *HttpService) versionHandler(w http.ResponseWriter, r *http.Request) { log.Infof("get version") // Define the version result struct - type vesionResult struct { + type versionResult struct { Version string `json:"version"` } // Create the result object with the current version - result := vesionResult{Version: version.GetVersion()} + result := versionResult{Version: version.GetVersion()} writeJson(w, result) } @@ -111,7 +111,15 @@ func (s *HttpService) versionHandler(w http.ResponseWriter, r *http.Request) { func createCcr(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobManager) error { log.Infof("create ccr %s", request) - ctx := ccr.NewJobContext(request.Src, request.Dest, request.SkipError, request.AllowTableExists, db, jobManager.GetFactory()) + ctx := &ccr.JobContext{ + Context: context.Background(), + Src: request.Src, + Dest: request.Dest, + SkipError: request.SkipError, + AllowTableExists: request.AllowTableExists, + Db: db, + Factory: jobManager.GetFactory(), + } job, err := ccr.NewJobFromService(request.Name, ctx) if err != nil { return err @@ -579,7 +587,7 @@ func (s *HttpService) jobDetailHandler(w http.ResponseWriter, r *http.Request) { type result struct { *defaultResult - JobDetail string `json:"job_detail"` + JobDetail *ccr.Job `json:"job_detail"` } var jobResult *result @@ -610,16 +618,21 @@ func (s *HttpService) jobDetailHandler(w http.ResponseWriter, r *http.Request) { return } - if jobDetail, err := s.db.GetJobInfo(request.Name); err != nil { + var jobDetail ccr.Job + if jobDetailStr, err := s.db.GetJobInfo(request.Name); err != nil { log.Warnf("get job info failed: %+v", err) - + jobResult = &result{ + defaultResult: newErrorResult(err.Error()), + } + } else if err = json.Unmarshal([]byte(jobDetailStr), &jobDetail); err != nil { + log.Warnf("unmarshal job info failed: %+v", err) jobResult = &result{ defaultResult: newErrorResult(err.Error()), } } else { jobResult = &result{ defaultResult: newSuccessResult(), - JobDetail: jobDetail, + JobDetail: &jobDetail, } } } @@ -690,6 +703,45 @@ func (s *HttpService) featuresHandler(w http.ResponseWriter, r *http.Request) { }) } +func (s *HttpService) updateHostMappingHandler(w http.ResponseWriter, r *http.Request) { + log.Infof("update host mapping") + + var result *defaultResult + defer func() { writeJson(w, result) }() + + // Parse the JSON request body + var request struct { + CcrCommonRequest + SrcHostMapping map[string]string `json:"src_host_mapping,required"` + DestHostMapping map[string]string `json:"dest_host_mapping,required"` + } + err := json.NewDecoder(r.Body).Decode(&request) + if err != nil { + log.Warnf("update host mapping failed: %+v", err) + result = newErrorResult(err.Error()) + return + } + + if request.Name == "" { + log.Warnf("update host mapping failed: name is empty") + result = newErrorResult("name is empty") + return + } + + if len(request.SrcHostMapping) == 0 && len(request.DestHostMapping) == 0 { + log.Warnf("update host mapping failed: src/dest_host_mapping is empty") + result = newErrorResult("host_mapping is empty") + return + } + + if err := s.jobManager.UpdateHostMapping(request.Name, request.SrcHostMapping, request.DestHostMapping); err != nil { + log.Warnf("update host mapping failed: %+v", err) + result = newErrorResult(err.Error()) + } else { + result = newSuccessResult() + } +} + func (s *HttpService) RegisterHandlers() { s.mux.HandleFunc("/version", s.versionHandler) s.mux.HandleFunc("/create_ccr", s.createHandler) @@ -705,6 +757,7 @@ func (s *HttpService) RegisterHandlers() { s.mux.HandleFunc("/job_progress", s.jobProgressHandler) s.mux.HandleFunc("/force_fullsync", s.forceFullsyncHandler) s.mux.HandleFunc("/features", s.featuresHandler) + s.mux.HandleFunc("/update_host_mapping", s.updateHostMappingHandler) s.mux.Handle("/metrics", promhttp.Handler()) } diff --git a/regression-test/common/helper.groovy b/regression-test/common/helper.groovy index 40d679d6..9fc243d6 100644 --- a/regression-test/common/helper.groovy +++ b/regression-test/common/helper.groovy @@ -85,10 +85,10 @@ class Helper { def gson = new com.google.gson.Gson() - Map srcSpec = context.getSrcSpec(db) + Map srcSpec = context.getSrcSpec(db) srcSpec.put("table", table) - Map destSpec = context.getDestSpec(db) + Map destSpec = context.getDestSpec(db) if (alias != null) { destSpec.put("table", alias) } else { @@ -124,6 +124,16 @@ class Helper { endpoint syncerAddress body "${bodyJson}" op "post" + check { code, body -> + if (!"${code}".toString().equals("200")) { + throw new Exception("request failed, code: ${code}, body: ${body}") + } + def jsonSlurper = new groovy.json.JsonSlurper() + def object = jsonSlurper.parseText "${body}" + if (!object.success) { + throw new Exception("request failed, error msg: ${object.error_msg}") + } + } } }