1
+ //! Streaming replication support.
2
+ //!
3
+ //! This module allows writing Postgres replication clients. A
4
+ //! replication client forms a special connection to the server in
5
+ //! either physical replication mode, which receives a stream of raw
6
+ //! Write-Ahead Log (WAL) records; or logical replication mode, which
7
+ //! receives a stream of data that depends on the output plugin
8
+ //! selected. All data and control messages are exchanged in CopyData
9
+ //! envelopes.
10
+ //!
11
+ //! See the [PostgreSQL protocol
12
+ //! documentation](https://www.postgresql.org/docs/current/protocol-replication.html)
13
+ //! for details of the protocol itself.
14
+ //!
15
+ //! # Physical Replication Client Example
16
+ //! ```no_run
17
+ //! extern crate tokio;
18
+ //!
19
+ //! use postgres_protocol::message::backend::ReplicationMessage;
20
+ //! use tokio::stream::StreamExt;
21
+ //! use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode};
22
+ //!
23
+ //! #[tokio::main]
24
+ //! async fn main() -> Result<(), Error> {
25
+ //! let conninfo = "host=localhost user=postgres dbname=postgres";
26
+ //!
27
+ //! // form replication connection
28
+ //! let (mut rclient, rconnection) =
29
+ //! connect_replication(conninfo, NoTls, ReplicationMode::Physical).await?;
30
+ //! tokio::spawn(async move {
31
+ //! if let Err(e) = rconnection.await {
32
+ //! eprintln!("connection error: {}", e);
33
+ //! }
34
+ //! });
35
+ //!
36
+ //! let identify_system = rclient.identify_system().await?;
37
+ //!
38
+ //! let mut physical_stream = rclient
39
+ //! .start_physical_replication(None, identify_system.xlogpos(), None)
40
+ //! .await?;
41
+ //!
42
+ //! while let Some(replication_message) = physical_stream.next().await {
43
+ //! match replication_message? {
44
+ //! ReplicationMessage::XLogData(xlog_data) => {
45
+ //! eprintln!("received XLogData: {:#?}", xlog_data);
46
+ //! }
47
+ //! ReplicationMessage::PrimaryKeepAlive(keepalive) => {
48
+ //! eprintln!("received PrimaryKeepAlive: {:#?}", keepalive);
49
+ //! }
50
+ //! _ => (),
51
+ //! }
52
+ //! }
53
+ //!
54
+ //! Ok(())
55
+ //! }
56
+ //! ```
57
+ //!
58
+ //! # Logical Replication Client Example
59
+ //!
60
+ //! This example requires the [wal2json
61
+ //! extension](https://github.com/eulerto/wal2json).
62
+ //!
63
+ //! ```no_run
64
+ //! extern crate tokio;
65
+ //!
66
+ //! use postgres_protocol::message::backend::ReplicationMessage;
67
+ //! use tokio::stream::StreamExt;
68
+ //! use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode};
69
+ //!
70
+ //! #[tokio::main]
71
+ //! async fn main() -> Result<(), Error> {
72
+ //! let conninfo = "host=localhost user=postgres dbname=postgres";
73
+ //!
74
+ //! // form replication connection
75
+ //! let (mut rclient, rconnection) =
76
+ //! connect_replication(conninfo, NoTls, ReplicationMode::Logical).await?;
77
+ //!
78
+ //! // spawn connection to run on its own
79
+ //! tokio::spawn(async move {
80
+ //! if let Err(e) = rconnection.await {
81
+ //! eprintln!("connection error: {}", e);
82
+ //! }
83
+ //! });
84
+ //!
85
+ //! let identify_system = rclient.identify_system().await?;
86
+ //!
87
+ //! let slot = "my_slot";
88
+ //! let plugin = "wal2json";
89
+ //! let options = &vec![("pretty-print", "1")];
90
+ //!
91
+ //! let _slotdesc = rclient
92
+ //! .create_logical_replication_slot(slot, false, plugin, None)
93
+ //! .await?;
94
+ //!
95
+ //! let mut physical_stream = rclient
96
+ //! .start_logical_replication(slot, identify_system.xlogpos(), options)
97
+ //! .await?;
98
+ //!
99
+ //! while let Some(replication_message) = physical_stream.next().await {
100
+ //! match replication_message? {
101
+ //! ReplicationMessage::XLogData(xlog_data) => {
102
+ //! eprintln!("received XLogData: {:#?}", xlog_data);
103
+ //! let json = std::str::from_utf8(xlog_data.data()).unwrap();
104
+ //! eprintln!("JSON text: {}", json);
105
+ //! }
106
+ //! ReplicationMessage::PrimaryKeepAlive(keepalive) => {
107
+ //! eprintln!("received PrimaryKeepAlive: {:#?}", keepalive);
108
+ //! }
109
+ //! _ => (),
110
+ //! }
111
+ //! }
112
+ //!
113
+ //! Ok(())
114
+ //! }
115
+ //! ```
116
+ //!
117
+ //! # Caveats
118
+ //!
119
+ //! It is recommended that you use a PostgreSQL server patch version
120
+ //! of at least: 14.0, 13.2, 12.6, 11.11, 10.16, 9.6.21, or 9.5.25.
121
+
1
122
use crate :: client:: Responses ;
2
123
use crate :: codec:: FrontendMessage ;
3
124
use crate :: connection:: RequestMessages ;
@@ -11,10 +132,12 @@ use postgres_protocol::escape::{escape_identifier, escape_literal};
11
132
use postgres_protocol:: message:: backend:: { Message , ReplicationMessage } ;
12
133
use postgres_protocol:: message:: frontend;
13
134
use std:: marker:: PhantomPinned ;
135
+ use std:: path:: { Path , PathBuf } ;
14
136
use std:: pin:: Pin ;
15
137
use std:: str:: from_utf8;
16
138
use std:: task:: { Context , Poll } ;
17
139
140
+ /// Result of [identify_system()](ReplicationClient::identify_system()) call.
18
141
#[ derive( Debug ) ]
19
142
pub struct IdentifySystem {
20
143
systemid : String ,
@@ -41,29 +164,36 @@ impl IdentifySystem {
41
164
}
42
165
}
43
166
167
+ /// Result of [timeline_history()](ReplicationClient::timeline_history()) call.
44
168
#[ derive( Debug ) ]
45
169
pub struct TimelineHistory {
46
- filename : String ,
47
- content : String ,
170
+ filename : PathBuf ,
171
+ content : Vec < u8 > ,
48
172
}
49
173
50
174
impl TimelineHistory {
51
- pub fn filename ( & self ) -> & str {
52
- & self . filename
175
+ pub fn filename ( & self ) -> & Path {
176
+ self . filename . as_path ( )
53
177
}
54
178
55
- pub fn content ( & self ) -> & str {
56
- & self . content
179
+ pub fn content ( & self ) -> & [ u8 ] {
180
+ self . content . as_slice ( )
57
181
}
58
182
}
59
183
184
+ /// Argument to
185
+ /// [create_logical_replication_slot()](ReplicationClient::create_logical_replication_slot).
60
186
#[ derive( Debug ) ]
61
187
pub enum SnapshotMode {
62
188
ExportSnapshot ,
63
189
NoExportSnapshot ,
64
190
UseSnapshot ,
65
191
}
66
192
193
+ /// Description of slot created with
194
+ /// [create_physical_replication_slot()](ReplicationClient::create_physical_replication_slot)
195
+ /// or
196
+ /// [create_logical_replication_slot()](ReplicationClient::create_logical_replication_slot).
67
197
#[ derive( Debug ) ]
68
198
pub struct CreateReplicationSlotResponse {
69
199
slot_name : String ,
@@ -90,127 +220,7 @@ impl CreateReplicationSlotResponse {
90
220
}
91
221
}
92
222
93
- /// Streaming replication support.
94
- ///
95
- /// This module allows writing Postgres replication clients. A
96
- /// replication client forms a special connection to the server in
97
- /// either physical replication mode, which receives a stream of raw
98
- /// Write-Ahead Log (WAL) records; or logical replication mode, which
99
- /// receives a stream of data that depends on the output plugin
100
- /// selected. All data and control messages are exchanged in CopyData
101
- /// envelopes.
102
- ///
103
- /// See the [PostgreSQL protocol
104
- /// documentation](https://www.postgresql.org/docs/current/protocol-replication.html)
105
- /// for details of the protocol itself.
106
- ///
107
- /// # Physical Replication Client Example
108
- /// ```no_run
109
- /// extern crate tokio;
110
- ///
111
- /// use postgres_protocol::message::backend::ReplicationMessage;
112
- /// use tokio::stream::StreamExt;
113
- /// use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode};
114
- ///
115
- /// #[tokio::main]
116
- /// async fn main() -> Result<(), Error> {
117
- /// let conninfo = "host=localhost user=postgres dbname=postgres";
118
- ///
119
- /// // form replication connection
120
- /// let (mut rclient, rconnection) =
121
- /// connect_replication(conninfo, NoTls, ReplicationMode::Physical).await?;
122
- /// tokio::spawn(async move {
123
- /// if let Err(e) = rconnection.await {
124
- /// eprintln!("connection error: {}", e);
125
- /// }
126
- /// });
127
- ///
128
- /// let identify_system = rclient.identify_system().await?;
129
- ///
130
- /// let mut physical_stream = rclient
131
- /// .start_physical_replication(None, identify_system.xlogpos(), None)
132
- /// .await?;
133
- ///
134
- /// while let Some(replication_message) = physical_stream.next().await {
135
- /// match replication_message? {
136
- /// ReplicationMessage::XLogData(xlog_data) => {
137
- /// eprintln!("received XLogData: {:#?}", xlog_data);
138
- /// }
139
- /// ReplicationMessage::PrimaryKeepAlive(keepalive) => {
140
- /// eprintln!("received PrimaryKeepAlive: {:#?}", keepalive);
141
- /// }
142
- /// _ => (),
143
- /// }
144
- /// }
145
- ///
146
- /// Ok(())
147
- /// }
148
- /// ```
149
- ///
150
- /// # Logical Replication Client Example
151
- ///
152
- /// This example requires the [wal2json
153
- /// extension](https://github.com/eulerto/wal2json).
154
- ///
155
- /// ```no_run
156
- /// extern crate tokio;
157
- ///
158
- /// use postgres_protocol::message::backend::ReplicationMessage;
159
- /// use tokio::stream::StreamExt;
160
- /// use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode};
161
- ///
162
- /// #[tokio::main]
163
- /// async fn main() -> Result<(), Error> {
164
- /// let conninfo = "host=localhost user=postgres dbname=postgres";
165
- ///
166
- /// // form replication connection
167
- /// let (mut rclient, rconnection) =
168
- /// connect_replication(conninfo, NoTls, ReplicationMode::Logical).await?;
169
- ///
170
- /// // spawn connection to run on its own
171
- /// tokio::spawn(async move {
172
- /// if let Err(e) = rconnection.await {
173
- /// eprintln!("connection error: {}", e);
174
- /// }
175
- /// });
176
- ///
177
- /// let identify_system = rclient.identify_system().await?;
178
- ///
179
- /// let slot = "my_slot";
180
- /// let plugin = "wal2json";
181
- /// let options = &vec![("pretty-print", "1")];
182
- ///
183
- /// let _slotdesc = rclient
184
- /// .create_logical_replication_slot(slot, false, plugin, None)
185
- /// .await?;
186
- ///
187
- /// let mut physical_stream = rclient
188
- /// .start_logical_replication(slot, identify_system.xlogpos(), options)
189
- /// .await?;
190
- ///
191
- /// while let Some(replication_message) = physical_stream.next().await {
192
- /// match replication_message? {
193
- /// ReplicationMessage::XLogData(xlog_data) => {
194
- /// eprintln!("received XLogData: {:#?}", xlog_data);
195
- /// let json = std::str::from_utf8(xlog_data.data()).unwrap();
196
- /// eprintln!("JSON text: {}", json);
197
- /// }
198
- /// ReplicationMessage::PrimaryKeepAlive(keepalive) => {
199
- /// eprintln!("received PrimaryKeepAlive: {:#?}", keepalive);
200
- /// }
201
- /// _ => (),
202
- /// }
203
- /// }
204
- ///
205
- /// Ok(())
206
- /// }
207
- /// ```
208
- ///
209
- /// # Caveats
210
- ///
211
- /// It is recommended that you upgrade your server to the latest
212
- /// patch version to fix a protocol implementation bug. Use at least
213
- /// versions: 13.2, 12.6, 11.11, 10.16, 9.6.21, 9.5.25.
223
+ /// Represents a client connected in replication mode.
214
224
pub struct ReplicationClient {
215
225
client : Client ,
216
226
replication_stream_active : bool ,
@@ -348,9 +358,11 @@ impl ReplicationClient {
348
358
let filename = & datarow. buffer ( ) [ ranges[ 0 ] . to_owned ( ) . unwrap ( ) ] ;
349
359
let content = & datarow. buffer ( ) [ ranges[ 1 ] . to_owned ( ) . unwrap ( ) ] ;
350
360
361
+ let filename_path = PathBuf :: from ( from_utf8 ( filename) . unwrap ( ) ) ;
362
+
351
363
Ok ( TimelineHistory {
352
- filename : from_utf8 ( filename ) . unwrap ( ) . to_string ( ) ,
353
- content : from_utf8 ( content) . unwrap ( ) . to_string ( ) ,
364
+ filename : filename_path ,
365
+ content : Vec :: from ( content) ,
354
366
} )
355
367
}
356
368
@@ -615,7 +627,9 @@ impl ReplicationClient {
615
627
}
616
628
}
617
629
618
- /// A stream of data from a `START_REPLICATION` command.
630
+ /// A stream of data from a `START_REPLICATION` command. All control
631
+ /// and data messages will be in
632
+ /// [CopyData](postgres_protocol::message::backend::Message::CopyData).
619
633
///
620
634
/// Intended to be used with the [next()](tokio::stream::StreamExt::next) method.
621
635
#[ pin_project( PinnedDrop ) ]
0 commit comments