Skip to content

Commit 7fe532b

Browse files
jeff-davispetrosagg
andcommitted
decoding logic for replication protocol
Co-authored-by: Petros Angelatos <[email protected]>
1 parent abb107e commit 7fe532b

File tree

1 file changed

+163
-0
lines changed

1 file changed

+163
-0
lines changed

postgres-protocol/src/message/backend.rs

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::str;
1111

1212
use crate::Oid;
1313

14+
// top-level message tags
1415
pub const PARSE_COMPLETE_TAG: u8 = b'1';
1516
pub const BIND_COMPLETE_TAG: u8 = b'2';
1617
pub const CLOSE_COMPLETE_TAG: u8 = b'3';
@@ -22,6 +23,7 @@ pub const DATA_ROW_TAG: u8 = b'D';
2223
pub const ERROR_RESPONSE_TAG: u8 = b'E';
2324
pub const COPY_IN_RESPONSE_TAG: u8 = b'G';
2425
pub const COPY_OUT_RESPONSE_TAG: u8 = b'H';
26+
pub const COPY_BOTH_RESPONSE_TAG: u8 = b'W';
2527
pub const EMPTY_QUERY_RESPONSE_TAG: u8 = b'I';
2628
pub const BACKEND_KEY_DATA_TAG: u8 = b'K';
2729
pub const NO_DATA_TAG: u8 = b'n';
@@ -33,6 +35,10 @@ pub const PARAMETER_DESCRIPTION_TAG: u8 = b't';
3335
pub const ROW_DESCRIPTION_TAG: u8 = b'T';
3436
pub const READY_FOR_QUERY_TAG: u8 = b'Z';
3537

38+
// replication message tags
39+
pub const XLOG_DATA_TAG: u8 = b'w';
40+
pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';
41+
3642
#[derive(Debug, Copy, Clone)]
3743
pub struct Header {
3844
tag: u8,
@@ -93,6 +99,7 @@ pub enum Message {
9399
CopyDone,
94100
CopyInResponse(CopyInResponseBody),
95101
CopyOutResponse(CopyOutResponseBody),
102+
CopyBothResponse(CopyBothResponseBody),
96103
DataRow(DataRowBody),
97104
EmptyQueryResponse,
98105
ErrorResponse(ErrorResponseBody),
@@ -190,6 +197,16 @@ impl Message {
190197
storage,
191198
})
192199
}
200+
COPY_BOTH_RESPONSE_TAG => {
201+
let format = buf.read_u8()?;
202+
let len = buf.read_u16::<BigEndian>()?;
203+
let storage = buf.read_all();
204+
Message::CopyBothResponse(CopyBothResponseBody {
205+
format,
206+
len,
207+
storage,
208+
})
209+
}
193210
EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
194211
BACKEND_KEY_DATA_TAG => {
195212
let process_id = buf.read_i32::<BigEndian>()?;
@@ -278,6 +295,59 @@ impl Message {
278295
}
279296
}
280297

298+
/// An enum representing Postgres backend replication messages.
299+
#[non_exhaustive]
300+
#[derive(Debug)]
301+
pub enum ReplicationMessage<D> {
302+
XLogData(XLogDataBody<D>),
303+
PrimaryKeepAlive(PrimaryKeepAliveBody),
304+
}
305+
306+
impl ReplicationMessage<Bytes> {
307+
#[inline]
308+
pub fn parse(buf: &Bytes) -> io::Result<Self> {
309+
let mut buf = Buffer {
310+
bytes: buf.clone(),
311+
idx: 0,
312+
};
313+
314+
let tag = buf.read_u8()?;
315+
316+
let replication_message = match tag {
317+
XLOG_DATA_TAG => {
318+
let wal_start = buf.read_u64::<BigEndian>()?;
319+
let wal_end = buf.read_u64::<BigEndian>()?;
320+
let timestamp = buf.read_i64::<BigEndian>()?;
321+
let data = buf.read_all();
322+
ReplicationMessage::XLogData(XLogDataBody {
323+
wal_start,
324+
wal_end,
325+
timestamp,
326+
data,
327+
})
328+
}
329+
PRIMARY_KEEPALIVE_TAG => {
330+
let wal_end = buf.read_u64::<BigEndian>()?;
331+
let timestamp = buf.read_i64::<BigEndian>()?;
332+
let reply = buf.read_u8()?;
333+
ReplicationMessage::PrimaryKeepAlive(PrimaryKeepAliveBody {
334+
wal_end,
335+
timestamp,
336+
reply,
337+
})
338+
}
339+
tag => {
340+
return Err(io::Error::new(
341+
io::ErrorKind::InvalidInput,
342+
format!("unknown replication message tag `{}`", tag),
343+
));
344+
}
345+
};
346+
347+
Ok(replication_message)
348+
}
349+
}
350+
281351
struct Buffer {
282352
bytes: Bytes,
283353
idx: usize,
@@ -524,6 +594,27 @@ impl CopyOutResponseBody {
524594
}
525595
}
526596

597+
pub struct CopyBothResponseBody {
598+
storage: Bytes,
599+
len: u16,
600+
format: u8,
601+
}
602+
603+
impl CopyBothResponseBody {
604+
#[inline]
605+
pub fn format(&self) -> u8 {
606+
self.format
607+
}
608+
609+
#[inline]
610+
pub fn column_formats(&self) -> ColumnFormats<'_> {
611+
ColumnFormats {
612+
remaining: self.len,
613+
buf: &self.storage,
614+
}
615+
}
616+
}
617+
527618
pub struct DataRowBody {
528619
storage: Bytes,
529620
len: u16,
@@ -776,6 +867,78 @@ impl RowDescriptionBody {
776867
}
777868
}
778869

870+
#[derive(Debug)]
871+
pub struct XLogDataBody<D> {
872+
wal_start: u64,
873+
wal_end: u64,
874+
timestamp: i64,
875+
data: D,
876+
}
877+
878+
impl<D> XLogDataBody<D> {
879+
#[inline]
880+
pub fn wal_start(&self) -> u64 {
881+
self.wal_start
882+
}
883+
884+
#[inline]
885+
pub fn wal_end(&self) -> u64 {
886+
self.wal_end
887+
}
888+
889+
#[inline]
890+
pub fn timestamp(&self) -> i64 {
891+
self.timestamp
892+
}
893+
894+
#[inline]
895+
pub fn data(&self) -> &D {
896+
&self.data
897+
}
898+
899+
#[inline]
900+
pub fn into_data(self) -> D {
901+
self.data
902+
}
903+
904+
pub fn map_data<F, D2, E>(self, f: F) -> Result<XLogDataBody<D2>, E>
905+
where
906+
F: Fn(D) -> Result<D2, E>,
907+
{
908+
let data = f(self.data)?;
909+
Ok(XLogDataBody {
910+
wal_start: self.wal_start,
911+
wal_end: self.wal_end,
912+
timestamp: self.timestamp,
913+
data,
914+
})
915+
}
916+
}
917+
918+
#[derive(Debug)]
919+
pub struct PrimaryKeepAliveBody {
920+
wal_end: u64,
921+
timestamp: i64,
922+
reply: u8,
923+
}
924+
925+
impl PrimaryKeepAliveBody {
926+
#[inline]
927+
pub fn wal_end(&self) -> u64 {
928+
self.wal_end
929+
}
930+
931+
#[inline]
932+
pub fn timestamp(&self) -> i64 {
933+
self.timestamp
934+
}
935+
936+
#[inline]
937+
pub fn reply(&self) -> u8 {
938+
self.reply
939+
}
940+
}
941+
779942
pub struct Fields<'a> {
780943
buf: &'a [u8],
781944
remaining: u16,

0 commit comments

Comments
 (0)