From 758a21be88d7cbb51445d5df6bd003bb3d1b34bb Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Oct 2017 12:06:58 -0700 Subject: [PATCH 1/4] Fix GSSAPI authentication --- kafka/conn.py | 74 ++++++++++++++++++++++----------------------------- 1 file changed, 32 insertions(+), 42 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index dbe212a7c..067dc0faa 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -514,52 +514,42 @@ def _try_authenticate_gssapi(self, future): ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos) log.debug('%s: canonical Servicename: %s', self, ctx_CanonName) ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate') - # Exchange tokens until authentication either succeeds or fails: + log.debug("%s: initiator name: %s", self, ctx_Context.initiator_name) + + # Exchange tokens until authentication either succeeds or fails received_token = None - try: - while not ctx_Context.complete: - # calculate the output token - try: - output_token = ctx_Context.step(received_token) - except GSSError as e: - log.exception("%s: Error invalid token received from server", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) - - if not output_token: - if ctx_Context.complete: - log.debug("%s: Security Context complete ", self) - log.debug("%s: Successful GSSAPI handshake for %s", self, ctx_Context.initiator_name) - break - try: - self._sock.setblocking(True) - # Send output token - msg = output_token - size = Int32.encode(len(msg)) - self._sock.sendall(size + msg) - - # The server will send a token back. Processing of this token either - # establishes a security context, or it needs further token exchange. - # The gssapi will be able to identify the needed next step. - # The connection is closed on failure. - response = self._sock.recv(2000) - self._sock.setblocking(False) - - except (AssertionError, ConnectionError) as e: - log.exception("%s: Error receiving reply from server", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) - future.failure(error) - self.close(error=error) + while not ctx_Context.complete: + # calculate an output token from kafka token (or None if first iteration) + # exceptions raised here are uncaught and will be sent to the user + output_token = ctx_Context.step(received_token) - # pass the received token back to gssapi, strip the first 4 bytes - received_token = response[4:] + # pass output token to kafka + try: + self._sock.setblocking(True) + msg = output_token + size = Int32.encode(len(msg)) + self._sock.sendall(size + msg) + # The server will send a token back. Processing of this token either + # establishes a security context, or it needs further token exchange. + # The gssapi will be able to identify the needed next step. + # The connection is closed on failure. + response = self._sock.recv(2000) + self._sock.setblocking(False) - except Exception as e: - log.exception("%s: GSSAPI handshake error", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) - future.failure(error) - self.close(error=error) + except (AssertionError, ConnectionError) as e: + log.exception("%s: Error receiving reply from server", self) + error = Errors.ConnectionError("%s: %s" % (self, e)) + self.close(error=error) + return future.failure(error) + + # pass the received token back to gssapi, strip the first 4 bytes + # dpkp note: what are the first four bytes here? + # it seems likely that this is the encoded message size + # which we should receive and parse first, then use to parse + # the remainder of the token + received_token = response[4:] - log.info('%s: Authenticated as %s', self, gssname) + log.info('%s: Authenticated as %s via GSSAPI', self, gssname) return future.success(True) def blacked_out(self): From 4d6d9ec485f9b44bc7fa989c843d8b1dfdca9bb2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Oct 2017 12:37:03 -0700 Subject: [PATCH 2/4] Return gssapi errors via future -- outer layer will raise to user --- kafka/conn.py | 63 +++++++++++++++++++++++++++------------------------ 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 067dc0faa..cd79e7f98 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -518,36 +518,39 @@ def _try_authenticate_gssapi(self, future): # Exchange tokens until authentication either succeeds or fails received_token = None - while not ctx_Context.complete: - # calculate an output token from kafka token (or None if first iteration) - # exceptions raised here are uncaught and will be sent to the user - output_token = ctx_Context.step(received_token) - - # pass output token to kafka - try: - self._sock.setblocking(True) - msg = output_token - size = Int32.encode(len(msg)) - self._sock.sendall(size + msg) - # The server will send a token back. Processing of this token either - # establishes a security context, or it needs further token exchange. - # The gssapi will be able to identify the needed next step. - # The connection is closed on failure. - response = self._sock.recv(2000) - self._sock.setblocking(False) - - except (AssertionError, ConnectionError) as e: - log.exception("%s: Error receiving reply from server", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) - self.close(error=error) - return future.failure(error) - - # pass the received token back to gssapi, strip the first 4 bytes - # dpkp note: what are the first four bytes here? - # it seems likely that this is the encoded message size - # which we should receive and parse first, then use to parse - # the remainder of the token - received_token = response[4:] + try: + while not ctx_Context.complete: + # calculate an output token from kafka token (or None if first iteration) + output_token = ctx_Context.step(received_token) + + # pass output token to kafka + try: + self._sock.setblocking(True) + msg = output_token + size = Int32.encode(len(msg)) + self._sock.sendall(size + msg) + # The server will send a token back. Processing of this token either + # establishes a security context, or it needs further token exchange. + # The gssapi will be able to identify the needed next step. + # The connection is closed on failure. + response = self._sock.recv(2000) + self._sock.setblocking(False) + + except ConnectionError as e: + log.exception("%s: Error receiving reply from server", self) + error = Errors.ConnectionError("%s: %s" % (self, e)) + self.close(error=error) + return future.failure(error) + + # pass the received token back to gssapi, strip the first 4 bytes + # dpkp note: what are the first four bytes here? + # it seems likely that this is the encoded message size + # which we should receive and parse first, then use to parse + # the remainder of the token + received_token = response[4:] + + except Exception as e: + return future.failure(e) log.info('%s: Authenticated as %s via GSSAPI', self, gssname) return future.success(True) From c06627a0907e58d8166bb5d8424fb54b997e43f2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Oct 2017 20:06:59 -0700 Subject: [PATCH 3/4] Read gssapi token using decoded size --- kafka/conn.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index cd79e7f98..ce2487edb 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -533,7 +533,9 @@ def _try_authenticate_gssapi(self, future): # establishes a security context, or it needs further token exchange. # The gssapi will be able to identify the needed next step. # The connection is closed on failure. - response = self._sock.recv(2000) + header = self._sock.recv(4) + token_size = struct.unpack('>i', header) + received_token = self._sock.recv(token_size) self._sock.setblocking(False) except ConnectionError as e: @@ -542,13 +544,6 @@ def _try_authenticate_gssapi(self, future): self.close(error=error) return future.failure(error) - # pass the received token back to gssapi, strip the first 4 bytes - # dpkp note: what are the first four bytes here? - # it seems likely that this is the encoded message size - # which we should receive and parse first, then use to parse - # the remainder of the token - received_token = response[4:] - except Exception as e: return future.failure(e) From 8e400a0001f5eb6c9f14620fcf1833ad9742c737 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 10 Oct 2017 10:11:51 -0700 Subject: [PATCH 4/4] Import struct --- kafka/conn.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index ce2487edb..da06c914d 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -6,8 +6,9 @@ import logging from random import shuffle, uniform import socket -import time +import struct import sys +import time from kafka.vendor import six