|
6 | 6 | import logging
|
7 | 7 | from random import shuffle, uniform
|
8 | 8 | import socket
|
9 |
| -import time |
| 9 | +import struct |
10 | 10 | import sys
|
| 11 | +import time |
11 | 12 |
|
12 | 13 | from kafka.vendor import six
|
13 | 14 |
|
@@ -516,52 +517,40 @@ def _try_authenticate_gssapi(self, future):
|
516 | 517 | ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos)
|
517 | 518 | log.debug('%s: canonical Servicename: %s', self, ctx_CanonName)
|
518 | 519 | ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate')
|
519 |
| - # Exchange tokens until authentication either succeeds or fails: |
| 520 | + log.debug("%s: initiator name: %s", self, ctx_Context.initiator_name) |
| 521 | + |
| 522 | + # Exchange tokens until authentication either succeeds or fails |
520 | 523 | received_token = None
|
521 | 524 | try:
|
522 | 525 | while not ctx_Context.complete:
|
523 |
| - # calculate the output token |
524 |
| - try: |
525 |
| - output_token = ctx_Context.step(received_token) |
526 |
| - except GSSError as e: |
527 |
| - log.exception("%s: Error invalid token received from server", self) |
528 |
| - error = Errors.ConnectionError("%s: %s" % (self, e)) |
| 526 | + # calculate an output token from kafka token (or None if first iteration) |
| 527 | + output_token = ctx_Context.step(received_token) |
529 | 528 |
|
530 |
| - if not output_token: |
531 |
| - if ctx_Context.complete: |
532 |
| - log.debug("%s: Security Context complete ", self) |
533 |
| - log.debug("%s: Successful GSSAPI handshake for %s", self, ctx_Context.initiator_name) |
534 |
| - break |
| 529 | + # pass output token to kafka |
535 | 530 | try:
|
536 | 531 | self._sock.setblocking(True)
|
537 |
| - # Send output token |
538 | 532 | msg = output_token
|
539 | 533 | size = Int32.encode(len(msg))
|
540 | 534 | self._sock.sendall(size + msg)
|
541 |
| - |
542 | 535 | # The server will send a token back. Processing of this token either
|
543 | 536 | # establishes a security context, or it needs further token exchange.
|
544 | 537 | # The gssapi will be able to identify the needed next step.
|
545 | 538 | # The connection is closed on failure.
|
546 |
| - response = self._sock.recv(2000) |
| 539 | + header = self._sock.recv(4) |
| 540 | + token_size = struct.unpack('>i', header) |
| 541 | + received_token = self._sock.recv(token_size) |
547 | 542 | self._sock.setblocking(False)
|
548 | 543 |
|
549 |
| - except (AssertionError, ConnectionError) as e: |
| 544 | + except ConnectionError as e: |
550 | 545 | log.exception("%s: Error receiving reply from server", self)
|
551 | 546 | error = Errors.ConnectionError("%s: %s" % (self, e))
|
552 |
| - future.failure(error) |
553 | 547 | self.close(error=error)
|
554 |
| - |
555 |
| - # pass the received token back to gssapi, strip the first 4 bytes |
556 |
| - received_token = response[4:] |
| 548 | + return future.failure(error) |
557 | 549 |
|
558 | 550 | except Exception as e:
|
559 |
| - log.exception("%s: GSSAPI handshake error", self) |
560 |
| - error = Errors.ConnectionError("%s: %s" % (self, e)) |
561 |
| - future.failure(error) |
562 |
| - self.close(error=error) |
| 551 | + return future.failure(e) |
563 | 552 |
|
564 |
| - log.info('%s: Authenticated as %s', self, gssname) |
| 553 | + log.info('%s: Authenticated as %s via GSSAPI', self, gssname) |
565 | 554 | return future.success(True)
|
566 | 555 |
|
567 | 556 | def blacked_out(self):
|
|
0 commit comments