117
117
//! # Caveats
118
118
//!
119
119
//! It is recommended that you use a PostgreSQL server patch version
120
- //! of at least: 14.0, 13.2, 12.6, 11.11, 10.16, 9.6.21, or 9.5.25.
120
+ //! of at least: 14.0, 13.2, 12.6, 11.11, 10.16, 9.6.21, or
121
+ //! 9.5.25. Earlier patch levels have a bug that doesn't properly
122
+ //! handle pipelined requests after streaming has stopped.
121
123
122
124
use crate :: client:: Responses ;
123
125
use crate :: codec:: FrontendMessage ;
@@ -131,6 +133,7 @@ use pin_project::{pin_project, pinned_drop};
131
133
use postgres_protocol:: escape:: { escape_identifier, escape_literal} ;
132
134
use postgres_protocol:: message:: backend:: { Message , ReplicationMessage , RowDescriptionBody } ;
133
135
use postgres_protocol:: message:: frontend;
136
+ use std:: io;
134
137
use std:: marker:: PhantomPinned ;
135
138
use std:: path:: { Path , PathBuf } ;
136
139
use std:: pin:: Pin ;
@@ -275,13 +278,7 @@ pub struct ReplicationClient {
275
278
}
276
279
277
280
impl ReplicationClient {
278
- pub ( crate ) fn new ( client : Client ) -> ReplicationClient {
279
- ReplicationClient { client : client }
280
- }
281
- }
282
-
283
- impl ReplicationClient {
284
- /// IDENTIFY_SYSTEM message
281
+ /// Requests the server to identify itself.
285
282
pub async fn identify_system ( & mut self ) -> Result < IdentifySystem , Error > {
286
283
let command = "IDENTIFY_SYSTEM" ;
287
284
let mut responses = self . send ( command) . await ?;
@@ -299,13 +296,9 @@ impl ReplicationClient {
299
296
300
297
assert_eq ! ( fields. len( ) , 4 ) ;
301
298
assert_eq ! ( fields[ 0 ] . type_oid( ) , Type :: TEXT . oid( ) ) ;
302
- assert_eq ! ( fields[ 0 ] . format( ) , 0 ) ;
303
299
assert_eq ! ( fields[ 1 ] . type_oid( ) , Type :: INT4 . oid( ) ) ;
304
- assert_eq ! ( fields[ 1 ] . format( ) , 0 ) ;
305
300
assert_eq ! ( fields[ 2 ] . type_oid( ) , Type :: TEXT . oid( ) ) ;
306
- assert_eq ! ( fields[ 2 ] . format( ) , 0 ) ;
307
301
assert_eq ! ( fields[ 3 ] . type_oid( ) , Type :: TEXT . oid( ) ) ;
308
- assert_eq ! ( fields[ 3 ] . format( ) , 0 ) ;
309
302
assert_eq ! ( ranges. len( ) , 4 ) ;
310
303
311
304
let values: Vec < Option < & str > > = ranges
@@ -325,7 +318,9 @@ impl ReplicationClient {
325
318
} )
326
319
}
327
320
328
- /// show the value of the given setting
321
+ /// Requests the server to send the current setting of a run-time
322
+ /// parameter. This is similar to the SQL command
323
+ /// [SHOW](https://www.postgresql.org/docs/current/sql-show.html).
329
324
pub async fn show ( & mut self , name : & str ) -> Result < String , Error > {
330
325
let command = format ! ( "SHOW {}" , escape_identifier( name) ) ;
331
326
let mut responses = self . send ( & command) . await ?;
@@ -350,8 +345,11 @@ impl ReplicationClient {
350
345
Ok ( String :: from ( val) )
351
346
}
352
347
353
- /// show the value of the given setting
354
- pub async fn timeline_history ( & mut self , timeline_id : u32 ) -> Result < TimelineHistory , Error > {
348
+ /// Requests the server to send over the timeline history file for
349
+ /// the given timeline ID.
350
+ pub async fn timeline_history (
351
+ & mut self , timeline_id : u32 ,
352
+ ) -> Result < TimelineHistory , Error > {
355
353
let command = format ! ( "TIMELINE_HISTORY {}" , timeline_id) ;
356
354
let mut responses = self . send ( & command) . await ?;
357
355
@@ -367,27 +365,57 @@ impl ReplicationClient {
367
365
let fields = rowdesc. fields ( ) . collect :: < Vec < _ > > ( ) . map_err ( Error :: parse) ?;
368
366
let ranges = datarow. ranges ( ) . collect :: < Vec < _ > > ( ) . map_err ( Error :: parse) ?;
369
367
368
+ // The TIMELINE_HISTORY command sends a misleading
369
+ // RowDescriptor which is different depending on the server
370
+ // version, so we ignore it aside from checking for the right
371
+ // number of fields. Both fields are documented to be raw
372
+ // bytes.
373
+ //
374
+ // Both fields are documented to return raw bytes.
370
375
assert_eq ! ( fields. len( ) , 2 ) ;
371
-
372
- assert_eq ! ( fields[ 0 ] . type_oid( ) , Type :: TEXT . oid( ) ) ;
373
- assert_eq ! ( fields[ 0 ] . format( ) , 0 ) ;
374
- assert_eq ! ( fields[ 1 ] . type_oid( ) , Type :: TEXT . oid( ) ) ;
375
- assert_eq ! ( fields[ 1 ] . format( ) , 0 ) ;
376
-
377
376
assert_eq ! ( ranges. len( ) , 2 ) ;
378
377
379
- let filename = & datarow. buffer ( ) [ ranges[ 0 ] . to_owned ( ) . unwrap ( ) ] ;
380
- let content = & datarow. buffer ( ) [ ranges[ 1 ] . to_owned ( ) . unwrap ( ) ] ;
381
-
382
- let filename_path = PathBuf :: from ( from_utf8 ( filename) . unwrap ( ) ) ;
378
+ // Practically speaking, the filename is ASCII, so it's OK to
379
+ // treat it as UTF-8, and convert it to a PathBuf. If this
380
+ // assumption is violated, generate a useful error message.
381
+ let filename_bytes = & datarow. buffer ( ) [ ranges[ 0 ] . to_owned ( ) . unwrap ( ) ] ;
382
+ let filename_str = from_utf8 ( filename_bytes) . map_err ( |_| {
383
+ io:: Error :: new ( io:: ErrorKind :: InvalidData ,
384
+ "Timeline history filename is invalid UTF-8" )
385
+ } ) . map_err ( Error :: parse) ?;
386
+ let filename_path = PathBuf :: from ( filename_str) ;
387
+
388
+ // The file contents are typically ASCII, but we treat it as
389
+ // binary because it can contain text in an unknown
390
+ // encoding. For instance, the restore point name will be in
391
+ // the server encoding (it will not be converted to the client
392
+ // encoding before being sent); and the file can also be
393
+ // edited by the user to contain arbitrary comments in an
394
+ // unknown encoding.
395
+ let content_bytes = & datarow. buffer ( ) [ ranges[ 1 ] . to_owned ( ) . unwrap ( ) ] ;
383
396
384
397
Ok ( TimelineHistory {
385
398
filename : filename_path,
386
- content : Vec :: from ( content ) ,
399
+ content : Vec :: from ( content_bytes ) ,
387
400
} )
388
401
}
389
402
390
- /// Create physical replication slot
403
+ /// Create logical replication slot. See [Replication
404
+ /// Slots](https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS).
405
+ ///
406
+ /// Arguments:
407
+ ///
408
+ /// * `slot_name`: The name of the slot to create. Must be a valid
409
+ /// replication slot name (see [Querying and Manipulating
410
+ /// Replication
411
+ /// Slots](https://www.postgresql.org/docs/13/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION)).
412
+ /// * `temporary`: Specify that this replication slot is a
413
+ /// temporary one. Temporary slots are not saved to disk and are
414
+ /// automatically dropped on error or when the session has
415
+ /// finished.
416
+ /// * `reserve_wal`: Specify that this physical replication slot
417
+ /// reserves WAL immediately. Otherwise, WAL is only reserved
418
+ /// upon connection from a streaming replication client.
391
419
pub async fn create_physical_replication_slot (
392
420
& mut self ,
393
421
slot_name : & str ,
@@ -444,7 +472,24 @@ impl ReplicationClient {
444
472
} )
445
473
}
446
474
447
- /// Create logical replication slot.
475
+ /// Create logical replication slot. See [Replication
476
+ /// Slots](https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS).
477
+ ///
478
+ /// Arguments:
479
+ ///
480
+ /// * `slot_name`: The name of the slot to create. Must be a valid
481
+ /// replication slot name (see [Querying and Manipulating
482
+ /// Replication
483
+ /// Slots](https://www.postgresql.org/docs/13/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION)).
484
+ /// * `temporary`: Specify that this replication slot is a
485
+ /// temporary one. Temporary slots are not saved to disk and are
486
+ /// automatically dropped on error or when the session has
487
+ /// finished.
488
+ /// * `plugin_name`: The name of the output plugin used for
489
+ /// logical decoding (see [Logical Decoding Output
490
+ /// Plugins](https://www.postgresql.org/docs/current/logicaldecoding-output-plugin.html)).
491
+ /// * `snapshot_mode`: Decides what to do with the snapshot
492
+ /// created during logical slot initialization.
448
493
pub async fn create_logical_replication_slot (
449
494
& mut self ,
450
495
slot_name : & str ,
@@ -498,7 +543,10 @@ impl ReplicationClient {
498
543
} )
499
544
}
500
545
501
- /// Drop replication slot
546
+ /// Drops a replication slot, freeing any reserved server-side
547
+ /// resources. If the slot is a logical slot that was created in a
548
+ /// database other than the database the walsender is connected
549
+ /// to, this command fails.
502
550
pub async fn drop_replication_slot (
503
551
& mut self ,
504
552
slot_name : & str ,
@@ -514,10 +562,18 @@ impl ReplicationClient {
514
562
Ok ( ( ) )
515
563
}
516
564
517
- /// Begin physical replication, consuming the replication client and producing a replication stream.
565
+ /// Begin physical replication, consuming the replication client
566
+ /// and producing a replication stream.
518
567
///
519
- /// Replication begins starting with the given Log Sequence Number
520
- /// (LSN) on the given timeline.
568
+ /// Arguments:
569
+ ///
570
+ /// * `slot_name`: If a slot's name is provided via slot_name, it
571
+ /// will be updated as replication progresses so that the server
572
+ /// knows which WAL segments, and if hot_standby_feedback is on
573
+ /// which transactions, are still needed by the standby.
574
+ /// * `lsn`: The starting WAL location.
575
+ /// * `timeline_id`: If specified, streaming starts on timeline
576
+ /// tli; otherwise, the server's current timeline is selected.
521
577
pub async fn start_physical_replication < ' a > (
522
578
& ' a mut self ,
523
579
slot_name : Option < & str > ,
@@ -544,8 +600,15 @@ impl ReplicationClient {
544
600
545
601
/// Begin logical replication, consuming the replication client and producing a replication stream.
546
602
///
547
- /// Replication begins starting with the given Log Sequence Number
548
- /// (LSN) on the current timeline.
603
+ /// Arguments:
604
+ ///
605
+ /// * `slot_name`: If a slot's name is provided via slot_name, it
606
+ /// will be updated as replication progresses so that the server
607
+ /// knows which WAL segments, and if hot_standby_feedback is on
608
+ /// which transactions, are still needed by the standby.
609
+ /// * `lsn`: The starting WAL location.
610
+ /// * `options`: (name, value) pairs of options passed to the
611
+ /// slot's logical decoding plugin.
549
612
pub async fn start_logical_replication < ' a > (
550
613
& ' a mut self ,
551
614
slot_name : & str ,
@@ -601,6 +664,10 @@ impl ReplicationClient {
601
664
602
665
// Private methods
603
666
667
+ pub ( crate ) fn new ( client : Client ) -> ReplicationClient {
668
+ ReplicationClient { client : client }
669
+ }
670
+
604
671
// send command to the server, but finish any unfinished replication stream, first
605
672
async fn send ( & mut self , command : & str ) -> Result < Responses , Error > {
606
673
let iclient = self . client . inner ( ) ;
0 commit comments