From 6bd9dc8f183b7fac952ff446dd7332230ffa7215 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Oct 2017 12:09:13 -0700 Subject: [PATCH 1/4] Check for disconnects during ssl handshake and sasl authentication --- kafka/conn.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 467519e3d..8f6e4a832 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -299,12 +299,15 @@ def connect(self): self._sock.setsockopt(*option) self._sock.setblocking(False) + self.last_attempt = time.time() + self.state = ConnectionStates.CONNECTING if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): self._wrap_ssl() - log.info('%s: connecting to %s:%d', self, self.host, self.port) - self.state = ConnectionStates.CONNECTING - self.last_attempt = time.time() - self.config['state_change_callback'](self) + # _wrap_ssl can alter the connection state -- disconnects on failure + # so we need to double check that we are still connecting before + if self.connecting(): + self.config['state_change_callback'](self) + log.info('%s: connecting to %s:%d', self, self.host, self.port) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -367,10 +370,12 @@ def connect(self): if self.state is ConnectionStates.AUTHENTICATING: assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') if self._try_authenticate(): - log.debug('%s: Connection complete.', self) - self.state = ConnectionStates.CONNECTED - self._reset_reconnect_backoff() - self.config['state_change_callback'](self) + # _try_authenticate has side-effects: possibly disconnected on socket errors + if self.state is ConnectionStates.AUTHENTICATING: + log.debug('%s: Connection complete.', self) + self.state = ConnectionStates.CONNECTED + self._reset_reconnect_backoff() + self.config['state_change_callback'](self) return self.state @@ -397,10 +402,7 @@ def _wrap_ssl(self): password=self.config['ssl_password']) if self.config['ssl_crlfile']: if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'): - error = 'No CRL support with this version of Python.' - log.error('%s: %s Disconnecting.', self, error) - self.close(Errors.ConnectionError(error)) - return + raise RuntimeError('This version of Python does not support ssl_crlfile!') log.info('%s: Loading SSL CRL from %s', self, self.config['ssl_crlfile']) self._ssl_context.load_verify_locations(self.config['ssl_crlfile']) # pylint: disable=no-member @@ -493,13 +495,15 @@ def _try_authenticate_plain(self, future): except (AssertionError, ConnectionError) as e: log.exception("%s: Error receiving reply from server", self) error = Errors.ConnectionError("%s: %s" % (self, e)) - future.failure(error) self.close(error=error) + return future.failure(error) if data != b'\x00\x00\x00\x00': - return future.failure(Errors.AuthenticationFailedError()) + error = Errors.AuthenticationFailedError('Unrecognized response during authentication') + future.failure(error) + raise error - log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username']) + log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username']) return future.success(True) def _try_authenticate_gssapi(self, future): From 45063c3c4680226cdec73db9df999ba9f00d44fc Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Oct 2017 12:26:07 -0700 Subject: [PATCH 2/4] Dont raise ConnectionError --- kafka/conn.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 8f6e4a832..e2d770784 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -445,7 +445,9 @@ def _try_authenticate(self): self._sasl_auth_future = future self._recv() if self._sasl_auth_future.failed(): - raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type + ex = self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type + if not isinstance(ex, Errors.ConnectionError): + raise ex return self._sasl_auth_future.succeeded() def _handle_sasl_handshake_response(self, future, response): @@ -488,11 +490,10 @@ def _try_authenticate_plain(self, future): error = Errors.AuthenticationFailedError( 'Authentication failed for user {0}'.format( self.config['sasl_plain_username'])) - future.failure(error) - raise error + return future.failure(error) data += fragment self._sock.setblocking(False) - except (AssertionError, ConnectionError) as e: + except ConnectionError as e: log.exception("%s: Error receiving reply from server", self) error = Errors.ConnectionError("%s: %s" % (self, e)) self.close(error=error) @@ -500,8 +501,7 @@ def _try_authenticate_plain(self, future): if data != b'\x00\x00\x00\x00': error = Errors.AuthenticationFailedError('Unrecognized response during authentication') - future.failure(error) - raise error + return future.failure(error) log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username']) return future.success(True) From 8bffeb9e277692fb4e3450275e9c5e4a6af10c0b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Oct 2017 13:58:43 -0700 Subject: [PATCH 3/4] Fixup pylint disable message --- kafka/conn.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index e2d770784..a1314131c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -445,9 +445,9 @@ def _try_authenticate(self): self._sasl_auth_future = future self._recv() if self._sasl_auth_future.failed(): - ex = self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type + ex = self._sasl_auth_future.exception if not isinstance(ex, Errors.ConnectionError): - raise ex + raise ex # pylint: disable-msg=raising-bad-type return self._sasl_auth_future.succeeded() def _handle_sasl_handshake_response(self, future, response): From 807d0b6614964ac0c1ed85809f2ffd357305d8ea Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 10 Oct 2017 11:12:16 -0700 Subject: [PATCH 4/4] Handle blocking socket recv during authentication --- kafka/conn.py | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index a1314131c..e10d4f1d7 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -467,6 +467,19 @@ def _handle_sasl_handshake_response(self, future, response): 'kafka-python does not support SASL mechanism %s' % self.config['sasl_mechanism'])) + def _recv_bytes_blocking(self, n): + self._sock.setblocking(True) + try: + data = b'' + while len(data) < n: + fragment = self._sock.recv(n - len(data)) + if not fragment: + raise ConnectionError('Connection reset during recv') + data += fragment + return data + finally: + self._sock.setblocking(False) + def _try_authenticate_plain(self, future): if self.config['security_protocol'] == 'SASL_PLAINTEXT': log.warning('%s: Sending username and password in the clear', self) @@ -480,19 +493,12 @@ def _try_authenticate_plain(self, future): self.config['sasl_plain_password']]).encode('utf-8')) size = Int32.encode(len(msg)) self._sock.sendall(size + msg) + self._sock.setblocking(False) # The server will send a zero sized message (that is Int32(0)) on success. # The connection is closed on failure - while len(data) < 4: - fragment = self._sock.recv(4 - len(data)) - if not fragment: - log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username']) - error = Errors.AuthenticationFailedError( - 'Authentication failed for user {0}'.format( - self.config['sasl_plain_username'])) - return future.failure(error) - data += fragment - self._sock.setblocking(False) + self._recv_bytes_blocking(4) + except ConnectionError as e: log.exception("%s: Error receiving reply from server", self) error = Errors.ConnectionError("%s: %s" % (self, e)) @@ -528,14 +534,15 @@ def _try_authenticate_gssapi(self, future): msg = output_token size = Int32.encode(len(msg)) self._sock.sendall(size + msg) + self._sock.setblocking(False) + # The server will send a token back. Processing of this token either # establishes a security context, or it needs further token exchange. # The gssapi will be able to identify the needed next step. # The connection is closed on failure. - header = self._sock.recv(4) + header = self._recv_bytes_blocking(4) token_size = struct.unpack('>i', header) - received_token = self._sock.recv(token_size) - self._sock.setblocking(False) + received_token = self._recv_bytes_blocking(token_size) except ConnectionError as e: log.exception("%s: Error receiving reply from server", self)