diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index e2001f32..c95879a4 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -1426,6 +1426,16 @@ func (j *Job) ingestBinlogForTxnInsert(txnId int64, tableRecords []*record.Table return subTxnInfos, nil } +func (j *Job) handleUpsertWithRetry(binlog *festruct.TBinlog) error { + err := j.handleUpsert(binlog) + if !xerror.IsCategory(err, xerror.Meta) { + return err + } + + log.Warnf("a meta error occurred, retry to handle upsert binlog again, commitSeq: %d", binlog.GetCommitSeq()) + return j.handleUpsert(binlog) +} + func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { log.Infof("handle upsert binlog, sub sync state: %s, prevCommitSeq: %d, commitSeq: %d", j.progress.SubSyncState, j.progress.PrevCommitSeq, j.progress.CommitSeq) @@ -2707,7 +2717,7 @@ func (j *Job) handleBinlog(binlog *festruct.TBinlog) error { switch binlog.GetType() { case festruct.TBinlogType_UPSERT: - return j.handleUpsert(binlog) + return j.handleUpsertWithRetry(binlog) case festruct.TBinlogType_ADD_PARTITION: return j.handleAddPartition(binlog) case festruct.TBinlogType_CREATE_TABLE: diff --git a/pkg/xerror/xerror.go b/pkg/xerror/xerror.go index 42f8a2de..95781d18 100644 --- a/pkg/xerror/xerror.go +++ b/pkg/xerror/xerror.go @@ -231,3 +231,15 @@ func WithStack(err error) error { callers(4), } } + +func IsCategory(err error, category ErrorCategory) bool { + if err == nil { + return false + } + + if xerr, ok := err.(*XError); ok { + return xerr.category == category + } + + return false +}