@@ -212,7 +212,7 @@ class BrokerConnection(object):
212212        'ssl_ciphers' : None ,
213213        'api_version' : (0 , 8 , 2 ),  # default to most restrictive 
214214        'selector' : selectors .DefaultSelector ,
215-         'state_change_callback' : lambda  conn : True ,
215+         'state_change_callback' : lambda  node_id ,  sock ,  conn : True ,
216216        'metrics' : None ,
217217        'metric_group_prefix' : '' ,
218218        'sasl_mechanism' : None ,
@@ -357,6 +357,7 @@ def connect(self):
357357                return  self .state 
358358            else :
359359                log .debug ('%s: creating new socket' , self )
360+                 assert  self ._sock  is  None 
360361                self ._sock_afi , self ._sock_addr  =  next_lookup 
361362                self ._sock  =  socket .socket (self ._sock_afi , socket .SOCK_STREAM )
362363
@@ -366,7 +367,7 @@ def connect(self):
366367
367368            self ._sock .setblocking (False )
368369            self .state  =  ConnectionStates .CONNECTING 
369-             self .config ['state_change_callback' ](self )
370+             self .config ['state_change_callback' ](self . node_id ,  self . _sock ,  self )
370371            log .info ('%s: connecting to %s:%d [%s %s]' , self , self .host ,
371372                     self .port , self ._sock_addr , AFI_NAMES [self ._sock_afi ])
372373
@@ -386,21 +387,21 @@ def connect(self):
386387                if  self .config ['security_protocol' ] in  ('SSL' , 'SASL_SSL' ):
387388                    log .debug ('%s: initiating SSL handshake' , self )
388389                    self .state  =  ConnectionStates .HANDSHAKE 
389-                     self .config ['state_change_callback' ](self )
390+                     self .config ['state_change_callback' ](self . node_id ,  self . _sock ,  self )
390391                    # _wrap_ssl can alter the connection state -- disconnects on failure 
391392                    self ._wrap_ssl ()
392393
393394                elif  self .config ['security_protocol' ] ==  'SASL_PLAINTEXT' :
394395                    log .debug ('%s: initiating SASL authentication' , self )
395396                    self .state  =  ConnectionStates .AUTHENTICATING 
396-                     self .config ['state_change_callback' ](self )
397+                     self .config ['state_change_callback' ](self . node_id ,  self . _sock ,  self )
397398
398399                else :
399400                    # security_protocol PLAINTEXT 
400401                    log .info ('%s: Connection complete.' , self )
401402                    self .state  =  ConnectionStates .CONNECTED 
402403                    self ._reset_reconnect_backoff ()
403-                     self .config ['state_change_callback' ](self )
404+                     self .config ['state_change_callback' ](self . node_id ,  self . _sock ,  self )
404405
405406            # Connection failed 
406407            # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems 
@@ -425,7 +426,7 @@ def connect(self):
425426                    log .info ('%s: Connection complete.' , self )
426427                    self .state  =  ConnectionStates .CONNECTED 
427428                    self ._reset_reconnect_backoff ()
428-                 self .config ['state_change_callback' ](self )
429+                 self .config ['state_change_callback' ](self . node_id ,  self . _sock ,  self )
429430
430431        if  self .state  is  ConnectionStates .AUTHENTICATING :
431432            assert  self .config ['security_protocol' ] in  ('SASL_PLAINTEXT' , 'SASL_SSL' )
@@ -435,7 +436,7 @@ def connect(self):
435436                    log .info ('%s: Connection complete.' , self )
436437                    self .state  =  ConnectionStates .CONNECTED 
437438                    self ._reset_reconnect_backoff ()
438-                     self .config ['state_change_callback' ](self )
439+                     self .config ['state_change_callback' ](self . node_id ,  self . _sock ,  self )
439440
440441        if  self .state  not  in ConnectionStates .CONNECTED ,
441442                              ConnectionStates .DISCONNECTED ):
@@ -802,15 +803,13 @@ def close(self, error=None):
802803                will be failed with this exception. 
803804                Default: kafka.errors.KafkaConnectionError. 
804805        """ 
806+         if  self .state  is  ConnectionStates .DISCONNECTED :
807+             return 
805808        with  self ._lock :
806809            if  self .state  is  ConnectionStates .DISCONNECTED :
807810                return 
808811            log .info ('%s: Closing connection. %s' , self , error  or  '' )
809-             self .state  =  ConnectionStates .DISCONNECTING 
810-             self .config ['state_change_callback' ](self )
811812            self ._update_reconnect_backoff ()
812-             self ._close_socket ()
813-             self .state  =  ConnectionStates .DISCONNECTED 
814813            self ._sasl_auth_future  =  None 
815814            self ._protocol  =  KafkaProtocol (
816815                client_id = self .config ['client_id' ],
@@ -819,9 +818,18 @@ def close(self, error=None):
819818                error  =  Errors .Cancelled (str (self ))
820819            ifrs  =  list (self .in_flight_requests .items ())
821820            self .in_flight_requests .clear ()
822-             self .config ['state_change_callback' ](self )
821+             self .state  =  ConnectionStates .DISCONNECTED 
822+             # To avoid race conditions and/or deadlocks 
823+             # keep a reference to the socket but leave it 
824+             # open until after the state_change_callback 
825+             # This should give clients a change to deregister 
826+             # the socket fd from selectors cleanly. 
827+             sock  =  self ._sock 
828+             self ._sock  =  None 
823829
824-         # drop lock before processing futures 
830+         # drop lock before state change callback and processing futures 
831+         self .config ['state_change_callback' ](self .node_id , sock , self )
832+         sock .close ()
825833        for  (_correlation_id , (future , _timestamp )) in  ifrs :
826834            future .failure (error )
827835
0 commit comments