From 94154bafe33ad48cdd4271a72cee6e9bb1c0be04 Mon Sep 17 00:00:00 2001 From: Rodrigo Navarro Date: Sun, 6 Jul 2025 14:13:44 -0300 Subject: [PATCH] Proper decoding logical replication messages --- postgres-replication/src/protocol.rs | 55 +++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/postgres-replication/src/protocol.rs b/postgres-replication/src/protocol.rs index 3f3899121..d0d110854 100644 --- a/postgres-replication/src/protocol.rs +++ b/postgres-replication/src/protocol.rs @@ -12,6 +12,7 @@ pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k'; // logical replication message tags const BEGIN_TAG: u8 = b'B'; +const MESSAGE_TAG: u8 = b'M'; const COMMIT_TAG: u8 = b'C'; const ORIGIN_TAG: u8 = b'O'; const RELATION_TAG: u8 = b'R'; @@ -165,7 +166,9 @@ impl PrimaryKeepAliveBody { pub enum LogicalReplicationMessage { /// A BEGIN statement Begin(BeginBody), - /// A BEGIN statement + /// A logical decoding message + Message(MessageBody), + /// A COMMIT statement Commit(CommitBody), /// An Origin replication message /// Note that there can be multiple Origin messages inside a single transaction. @@ -199,6 +202,21 @@ impl LogicalReplicationMessage { timestamp: buf.read_i64::()?, xid: buf.read_u32::()?, }), + MESSAGE_TAG => Self::Message(MessageBody { + flags: buf.read_i8()?, + message_lsn: buf.read_u64::()?, + prefix: buf.read_cstr()?, + content: match buf.read_i32::()? { + len if len > 0 => buf.read_buf(len as usize)?, + 0 => Bytes::new(), + len => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("unexpected message content length `{len}`"), + )) + } + }, + }), COMMIT_TAG => Self::Commit(CommitBody { flags: buf.read_i8()?, commit_lsn: buf.read_u64::()?, @@ -491,6 +509,41 @@ impl BeginBody { } } +/// A logical decoding message +#[derive(Debug)] +pub struct MessageBody { + message_lsn: u64, + flags: i8, + prefix: Bytes, + content: Bytes, +} + +impl MessageBody { + #[inline] + /// The LSN of the logical decoding message. + pub fn message_lsn(&self) -> Lsn { + self.message_lsn + } + + #[inline] + /// Flags. Currently can be either 0 for no flags or 1 if the logical decoding message is transactional. + pub fn flags(&self) -> i8 { + self.flags + } + + #[inline] + /// The prefix of the logical decoding message. + pub fn prefix(&self) -> io::Result<&str> { + get_str(&self.prefix) + } + + #[inline] + /// The content of the logical decoding message. + pub fn content(&self) -> &Bytes { + &self.content + } +} + /// A COMMIT statement #[derive(Debug)] pub struct CommitBody {