Skip to content

Commit 23711c9

Browse files
authored
fix(replication): calculate LogPos for non-artificial events in MariaDB 11.4+ (#1052)
* fix(replication): calculate LogPos for non-artificial events in MariaDB 11.4+ * fix(replication): add configuration parameter to enable calculation of LogPos for non-artificial events in MariaDB 11.4+
1 parent 84c5e8e commit 23711c9

File tree

2 files changed

+61
-2
lines changed

2 files changed

+61
-2
lines changed

README.md

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,33 @@ Schema: test
133133
Query: DROP TABLE IF EXISTS `test_replication` /* generated by server */
134134
```
135135

136-
## Canal
136+
### MariaDB 11.4+ compatibility
137+
138+
MariaDB 11.4+ introduced an optimization where events written through transaction or statement cache have `LogPos=0` so they can be copied directly to the binlog without computing the real end position. This optimization improves performance but makes position tracking unreliable for replication clients that need to track LogPos of events inside transactions.
139+
140+
To address this, a `MariaDBDynamicLogPos` configuration option is available:
141+
142+
```go
143+
cfg := replication.BinlogSyncerConfig {
144+
ServerID: 100,
145+
Flavor: "mariadb",
146+
Host: "127.0.0.1",
147+
Port: 3306,
148+
User: "root",
149+
Password: "",
150+
// Enable dynamic LogPos calculation for MariaDB 11.4+
151+
MariaDBDynamicLogPos: true,
152+
}
153+
```
154+
155+
**Behavior:**
156+
- When `MariaDBDynamicLogPos` is `true` and flavor is `mariadb`, the library automatically:
157+
- Adds `BINLOG_SEND_ANNOTATE_ROWS_EVENT` flag to binlog dump commands. This ensures correct position tracking by making the server send `ANNOTATE_ROWS_EVENT` events which are needed for accurate position calculation.
158+
- Calculates LogPos dynamically for events with `LogPos=0` that are not artificial.
159+
- Only works with MariaDB flavor; has no effect with MySQL.
160+
- Should be set to `true` if tracking of LogPos inside transactions is required.
161+
162+
## Canal
137163

138164
Canal is a package that can sync your MySQL into everywhere, like Redis, Elasticsearch.
139165

replication/binlogsyncer.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@ type BinlogSyncerConfig struct {
128128

129129
EventCacheCount int
130130

131+
// MariaDBDynamicLogPos enables dynamic LogPos calculation for MariaDB.
132+
// When enabled, automatically adds BINLOG_SEND_ANNOTATE_ROWS_EVENT flag
133+
// to ensure correct position calculation in MariaDB 11.4+.
134+
// Only works with MariaDB flavor.
135+
MariaDBDynamicLogPos bool
136+
131137
// SynchronousEventHandler is used for synchronous event handling.
132138
// This should not be used together with StartBackupWithHandler.
133139
// If this is not nil, GetEvent does not need to be called.
@@ -509,7 +515,14 @@ func (b *BinlogSyncer) writeBinlogDumpCommand(p mysql.Position) error {
509515
binary.LittleEndian.PutUint32(data[pos:], p.Pos)
510516
pos += 4
511517

512-
binary.LittleEndian.PutUint16(data[pos:], b.cfg.DumpCommandFlag)
518+
dumpCommandFlag := b.cfg.DumpCommandFlag
519+
if b.cfg.MariaDBDynamicLogPos && b.cfg.Flavor == mysql.MariaDBFlavor {
520+
// Add BINLOG_SEND_ANNOTATE_ROWS_EVENT flag when MariaDBDynamicLogPos is enabled.
521+
// This ensures the server sends ANNOTATE_ROWS_EVENT events which are needed
522+
// for correct LogPos calculation in MariaDB 11.4+, where some events have LogPos=0.
523+
dumpCommandFlag |= BINLOG_SEND_ANNOTATE_ROWS_EVENT
524+
}
525+
binary.LittleEndian.PutUint16(data[pos:], dumpCommandFlag)
513526
pos += 2
514527

515528
binary.LittleEndian.PutUint32(data[pos:], b.cfg.ServerID)
@@ -861,6 +874,13 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, need
861874
if e.Header.LogPos > 0 {
862875
// Some events like FormatDescriptionEvent return 0, ignore.
863876
b.nextPos.Pos = e.Header.LogPos
877+
} else if b.shouldCalculateDynamicLogPos(e) {
878+
calculatedPos := b.nextPos.Pos + e.Header.EventSize
879+
e.Header.LogPos = calculatedPos
880+
b.nextPos.Pos = calculatedPos
881+
b.cfg.Logger.Debug("MariaDB dynamic LogPos calculation",
882+
slog.String("eventType", e.Header.EventType.String()),
883+
slog.Uint64("logPos", uint64(calculatedPos)))
864884
}
865885

866886
// Handle event types to update positions and GTID sets
@@ -944,6 +964,19 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, need
944964
return nil
945965
}
946966

967+
// shouldCalculateDynamicLogPos determines if we should calculate LogPos dynamically for MariaDB events.
968+
// This is needed for MariaDB 11.4+ when:
969+
// 1. MariaDBDynamicLogPos is enabled
970+
// 2. We're using MariaDB flavor
971+
// 3. The event has LogPos=0 (indicating server didn't set it)
972+
// 4. The event is not artificial (not marked with LOG_EVENT_ARTIFICIAL_F flag)
973+
func (b *BinlogSyncer) shouldCalculateDynamicLogPos(e *BinlogEvent) bool {
974+
return b.cfg.MariaDBDynamicLogPos &&
975+
b.cfg.Flavor == mysql.MariaDBFlavor &&
976+
e.Header.LogPos == 0 &&
977+
(e.Header.Flags&LOG_EVENT_ARTIFICIAL_F) == 0
978+
}
979+
947980
// getCurrentGtidSet returns a clone of the current GTID set.
948981
func (b *BinlogSyncer) getCurrentGtidSet() mysql.GTIDSet {
949982
if b.currGset != nil {

0 commit comments

Comments
 (0)