8383
8484from .standby import First
8585
86+ from . import utils
87+
8688from .utils import \
8789 PgVer , \
8890 eprint , \
8991 get_bin_path , \
9092 get_pg_version , \
91- reserve_port , \
92- release_port , \
9393 execute_utility , \
9494 options_string , \
9595 clean_on_error
@@ -128,6 +128,9 @@ def __repr__(self):
128128
129129
130130class PostgresNode (object ):
131+ # a max number of node start attempts
132+ _C_MAX_START_ATEMPTS = 5
133+
131134 def __init__ (self , name = None , base_dir = None , port = None , conn_params : ConnectionParams = ConnectionParams (), bin_dir = None , prefix = None ):
132135 """
133136 PostgresNode constructor.
@@ -158,7 +161,7 @@ def __init__(self, name=None, base_dir=None, port=None, conn_params: ConnectionP
158161 self .os_ops = LocalOperations (conn_params )
159162
160163 self .host = self .os_ops .host
161- self .port = port or reserve_port ()
164+ self .port = port or utils . reserve_port ()
162165
163166 self .ssh_key = self .os_ops .ssh_key
164167
@@ -471,6 +474,28 @@ def _collect_special_files(self):
471474
472475 return result
473476
477+ def _collect_log_files (self ):
478+ # dictionary of log files + size in bytes
479+
480+ files = [
481+ self .pg_log_file
482+ ] # yapf: disable
483+
484+ result = {}
485+
486+ for f in files :
487+ # skip missing files
488+ if not self .os_ops .path_exists (f ):
489+ continue
490+
491+ file_size = self .os_ops .get_file_size (f )
492+ assert type (file_size ) == int # noqa: E721
493+ assert file_size >= 0
494+
495+ result [f ] = file_size
496+
497+ return result
498+
474499 def init (self , initdb_params = None , cached = True , ** kwargs ):
475500 """
476501 Perform initdb for this node.
@@ -722,6 +747,22 @@ def slow_start(self, replica=False, dbname='template1', username=None, max_attem
722747 OperationalError },
723748 max_attempts = max_attempts )
724749
750+ def _detect_port_conflict (self , log_files0 , log_files1 ):
751+ assert type (log_files0 ) == dict # noqa: E721
752+ assert type (log_files1 ) == dict # noqa: E721
753+
754+ for file in log_files1 .keys ():
755+ read_pos = 0
756+
757+ if file in log_files0 .keys ():
758+ read_pos = log_files0 [file ] # the previous size
759+
760+ file_content = self .os_ops .read_binary (file , read_pos )
761+ file_content_s = file_content .decode ()
762+ if 'Is another postmaster already running on port' in file_content_s :
763+ return True
764+ return False
765+
725766 def start (self , params = [], wait = True ):
726767 """
727768 Starts the PostgreSQL node using pg_ctl if node has not been started.
@@ -736,6 +777,9 @@ def start(self, params=[], wait=True):
736777 Returns:
737778 This instance of :class:`.PostgresNode`.
738779 """
780+
781+ assert __class__ ._C_MAX_START_ATEMPTS > 1
782+
739783 if self .is_started :
740784 return self
741785
@@ -745,27 +789,46 @@ def start(self, params=[], wait=True):
745789 "-w" if wait else '-W' , # --wait or --no-wait
746790 "start" ] + params # yapf: disable
747791
748- startup_retries = 5
792+ log_files0 = self ._collect_log_files ()
793+ assert type (log_files0 ) == dict # noqa: E721
794+
795+ nAttempt = 0
796+ timeout = 1
749797 while True :
798+ assert nAttempt >= 0
799+ assert nAttempt < __class__ ._C_MAX_START_ATEMPTS
800+ nAttempt += 1
750801 try :
751802 exit_status , out , error = execute_utility (_params , self .utils_log_file , verbose = True )
752803 if error and 'does not exist' in error :
753804 raise Exception
754805 except Exception as e :
755- files = self ._collect_special_files ()
756- if any (len (file ) > 1 and 'Is another postmaster already '
757- 'running on port' in file [1 ].decode () for
758- file in files ):
759- logging .warning ("Detected an issue with connecting to port {0}. "
760- "Trying another port after a 5-second sleep..." .format (self .port ))
761- self .port = reserve_port ()
762- options = {'port' : str (self .port )}
763- self .set_auto_conf (options )
764- startup_retries -= 1
765- time .sleep (5 )
766- continue
806+ assert nAttempt > 0
807+ assert nAttempt <= __class__ ._C_MAX_START_ATEMPTS
808+ if self ._should_free_port and nAttempt < __class__ ._C_MAX_START_ATEMPTS :
809+ log_files1 = self ._collect_log_files ()
810+ if self ._detect_port_conflict (log_files0 , log_files1 ):
811+ log_files0 = log_files1
812+ logging .warning (
813+ "Detected an issue with connecting to port {0}. "
814+ "Trying another port after a {1}-second sleep..." .format (self .port , timeout )
815+ )
816+ time .sleep (timeout )
817+ timeout = min (2 * timeout , 5 )
818+ cur_port = self .port
819+ new_port = utils .reserve_port () # can raise
820+ try :
821+ options = {'port' : str (new_port )}
822+ self .set_auto_conf (options )
823+ except : # noqa: E722
824+ utils .release_port (new_port )
825+ raise
826+ self .port = new_port
827+ utils .release_port (cur_port )
828+ continue
767829
768830 msg = 'Cannot start node'
831+ files = self ._collect_special_files ()
769832 raise_from (StartNodeException (msg , files ), e )
770833 break
771834 self ._maybe_start_logger ()
@@ -930,8 +993,10 @@ def free_port(self):
930993 """
931994
932995 if self ._should_free_port :
996+ port = self .port
933997 self ._should_free_port = False
934- release_port (self .port )
998+ self .port = None
999+ utils .release_port (port )
9351000
9361001 def cleanup (self , max_attempts = 3 , full = False ):
9371002 """
0 commit comments