Skip to content

Commit 0d4e28f

Browse files
authored
Add kafka.protocol.parser.KafkaProtocol w/ receive and send (#1230)
1 parent 7305f03 commit 0d4e28f

File tree

3 files changed

+226
-146
lines changed

3 files changed

+226
-146
lines changed

kafka/conn.py

Lines changed: 44 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,10 @@
1414
import kafka.errors as Errors
1515
from kafka.future import Future
1616
from kafka.metrics.stats import Avg, Count, Max, Rate
17-
from kafka.protocol.api import RequestHeader
1817
from kafka.protocol.admin import SaslHandShakeRequest
19-
from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
20-
from kafka.protocol.frame import KafkaBytes
18+
from kafka.protocol.commit import OffsetFetchRequest
2119
from kafka.protocol.metadata import MetadataRequest
20+
from kafka.protocol.parser import KafkaProtocol
2221
from kafka.protocol.types import Int32
2322
from kafka.version import __version__
2423

@@ -73,9 +72,6 @@ class ConnectionStates(object):
7372
CONNECTED = '<connected>'
7473
AUTHENTICATING = '<authenticating>'
7574

76-
InFlightRequest = collections.namedtuple('InFlightRequest',
77-
['request', 'response_type', 'correlation_id', 'future', 'timestamp'])
78-
7975

8076
class BrokerConnection(object):
8177
"""Initialize a Kafka broker connection
@@ -230,19 +226,17 @@ def __init__(self, host, port, afi, **configs):
230226
assert gssapi is not None, 'GSSAPI lib not available'
231227
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
232228

229+
self._protocol = KafkaProtocol(
230+
client_id=self.config['client_id'],
231+
api_version=self.config['api_version'])
233232
self.state = ConnectionStates.DISCONNECTED
234233
self._reset_reconnect_backoff()
235234
self._sock = None
236235
self._ssl_context = None
237236
if self.config['ssl_context'] is not None:
238237
self._ssl_context = self.config['ssl_context']
239238
self._sasl_auth_future = None
240-
self._header = KafkaBytes(4)
241-
self._rbuffer = None
242-
self._receiving = False
243239
self.last_attempt = 0
244-
self._processing = False
245-
self._correlation_id = 0
246240
self._gai = None
247241
self._gai_index = 0
248242
self._sensors = None
@@ -635,19 +629,16 @@ def close(self, error=None):
635629
self.state = ConnectionStates.DISCONNECTED
636630
self.last_attempt = time.time()
637631
self._sasl_auth_future = None
638-
self._reset_buffer()
632+
self._protocol = KafkaProtocol(
633+
client_id=self.config['client_id'],
634+
api_version=self.config['api_version'])
639635
if error is None:
640636
error = Errors.Cancelled(str(self))
641637
while self.in_flight_requests:
642-
ifr = self.in_flight_requests.popleft()
643-
ifr.future.failure(error)
638+
(_, future, _) = self.in_flight_requests.popleft()
639+
future.failure(error)
644640
self.config['state_change_callback'](self)
645641

646-
def _reset_buffer(self):
647-
self._receiving = False
648-
self._header.seek(0)
649-
self._rbuffer = None
650-
651642
def send(self, request):
652643
"""send request, return Future()
653644
@@ -665,13 +656,8 @@ def send(self, request):
665656
def _send(self, request):
666657
assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED)
667658
future = Future()
668-
correlation_id = self._next_correlation_id()
669-
header = RequestHeader(request,
670-
correlation_id=correlation_id,
671-
client_id=self.config['client_id'])
672-
message = b''.join([header.encode(), request.encode()])
673-
size = Int32.encode(len(message))
674-
data = size + message
659+
correlation_id = self._protocol.send_request(request)
660+
data = self._protocol.send_bytes()
675661
try:
676662
# In the future we might manage an internal write buffer
677663
# and send bytes asynchronously. For now, just block
@@ -693,11 +679,7 @@ def _send(self, request):
693679
log.debug('%s Request %d: %s', self, correlation_id, request)
694680

695681
if request.expect_response():
696-
ifr = InFlightRequest(request=request,
697-
correlation_id=correlation_id,
698-
response_type=request.RESPONSE_TYPE,
699-
future=future,
700-
timestamp=time.time())
682+
ifr = (correlation_id, future, time.time())
701683
self.in_flight_requests.append(ifr)
702684
else:
703685
future.success(None)
@@ -714,7 +696,6 @@ def recv(self):
714696
715697
Return response if available
716698
"""
717-
assert not self._processing, 'Recursion not supported'
718699
if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
719700
log.warning('%s cannot recv: socket not connected', self)
720701
# If requests are pending, we should close the socket and
@@ -727,15 +708,28 @@ def recv(self):
727708
log.warning('%s: No in-flight-requests to recv', self)
728709
return ()
729710

730-
response = self._recv()
731-
if not response and self.requests_timed_out():
711+
responses = self._recv()
712+
if not responses and self.requests_timed_out():
732713
log.warning('%s timed out after %s ms. Closing connection.',
733714
self, self.config['request_timeout_ms'])
734715
self.close(error=Errors.RequestTimedOutError(
735716
'Request timed out after %s ms' %
736717
self.config['request_timeout_ms']))
737718
return ()
738-
return response
719+
720+
for response in responses:
721+
(correlation_id, future, timestamp) = self.in_flight_requests.popleft()
722+
if isinstance(response, Errors.KafkaError):
723+
self.close(response)
724+
break
725+
726+
if self._sensors:
727+
self._sensors.request_time.record((time.time() - timestamp) * 1000)
728+
729+
log.debug('%s Response %d: %s', self, correlation_id, response)
730+
future.success(response)
731+
732+
return responses
739733

740734
def _recv(self):
741735
responses = []
@@ -751,10 +745,7 @@ def _recv(self):
751745
log.error('%s: socket disconnected', self)
752746
self.close(error=Errors.ConnectionError('socket disconnected'))
753747
break
754-
else:
755-
responses.extend(self.receive_bytes(data))
756-
if len(data) < SOCK_CHUNK_BYTES:
757-
break
748+
758749
except SSLWantReadError:
759750
break
760751
except ConnectionError as e:
@@ -768,118 +759,26 @@ def _recv(self):
768759
if six.PY3:
769760
break
770761
raise
771-
return responses
772762

773-
def receive_bytes(self, data):
774-
i = 0
775-
n = len(data)
776-
responses = []
777-
if self._sensors:
778-
self._sensors.bytes_received.record(n)
779-
while i < n:
780-
781-
# Not receiving is the state of reading the payload header
782-
if not self._receiving:
783-
bytes_to_read = min(4 - self._header.tell(), n - i)
784-
self._header.write(data[i:i+bytes_to_read])
785-
i += bytes_to_read
786-
787-
if self._header.tell() == 4:
788-
self._header.seek(0)
789-
nbytes = Int32.decode(self._header)
790-
# reset buffer and switch state to receiving payload bytes
791-
self._rbuffer = KafkaBytes(nbytes)
792-
self._receiving = True
793-
elif self._header.tell() > 4:
794-
raise Errors.KafkaError('this should not happen - are you threading?')
795-
796-
797-
if self._receiving:
798-
total_bytes = len(self._rbuffer)
799-
staged_bytes = self._rbuffer.tell()
800-
bytes_to_read = min(total_bytes - staged_bytes, n - i)
801-
self._rbuffer.write(data[i:i+bytes_to_read])
802-
i += bytes_to_read
803-
804-
staged_bytes = self._rbuffer.tell()
805-
if staged_bytes > total_bytes:
806-
self.close(error=Errors.KafkaError('Receive buffer has more bytes than expected?'))
807-
808-
if staged_bytes != total_bytes:
809-
break
763+
if self._sensors:
764+
self._sensors.bytes_received.record(len(data))
810765

811-
self._receiving = False
812-
self._rbuffer.seek(0)
813-
resp = self._process_response(self._rbuffer)
814-
if resp is not None:
815-
responses.append(resp)
816-
self._reset_buffer()
817-
return responses
766+
try:
767+
more_responses = self._protocol.receive_bytes(data)
768+
except Errors.KafkaProtocolError as e:
769+
self.close(e)
770+
break
771+
else:
772+
responses.extend([resp for (_, resp) in more_responses])
818773

819-
def _process_response(self, read_buffer):
820-
assert not self._processing, 'Recursion not supported'
821-
self._processing = True
822-
recv_correlation_id = Int32.decode(read_buffer)
823-
824-
if not self.in_flight_requests:
825-
error = Errors.CorrelationIdError(
826-
'%s: No in-flight-request found for server response'
827-
' with correlation ID %d'
828-
% (self, recv_correlation_id))
829-
self.close(error)
830-
self._processing = False
831-
return None
832-
else:
833-
ifr = self.in_flight_requests.popleft()
834-
835-
if self._sensors:
836-
self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000)
837-
838-
# verify send/recv correlation ids match
839-
840-
# 0.8.2 quirk
841-
if (self.config['api_version'] == (0, 8, 2) and
842-
ifr.response_type is GroupCoordinatorResponse[0] and
843-
ifr.correlation_id != 0 and
844-
recv_correlation_id == 0):
845-
log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse'
846-
' Correlation ID does not match request. This'
847-
' should go away once at least one topic has been'
848-
' initialized on the broker.')
849-
850-
elif ifr.correlation_id != recv_correlation_id:
851-
error = Errors.CorrelationIdError(
852-
'%s: Correlation IDs do not match: sent %d, recv %d'
853-
% (self, ifr.correlation_id, recv_correlation_id))
854-
ifr.future.failure(error)
855-
self.close(error)
856-
self._processing = False
857-
return None
858-
859-
# decode response
860-
try:
861-
response = ifr.response_type.decode(read_buffer)
862-
except ValueError:
863-
read_buffer.seek(0)
864-
buf = read_buffer.read()
865-
log.error('%s Response %d [ResponseType: %s Request: %s]:'
866-
' Unable to decode %d-byte buffer: %r', self,
867-
ifr.correlation_id, ifr.response_type,
868-
ifr.request, len(buf), buf)
869-
error = Errors.UnknownError('Unable to decode response')
870-
ifr.future.failure(error)
871-
self.close(error)
872-
self._processing = False
873-
return None
874-
875-
log.debug('%s Response %d: %s', self, ifr.correlation_id, response)
876-
ifr.future.success(response)
877-
self._processing = False
878-
return response
774+
if len(data) < SOCK_CHUNK_BYTES:
775+
break
776+
777+
return responses
879778

880779
def requests_timed_out(self):
881780
if self.in_flight_requests:
882-
oldest_at = self.in_flight_requests[0].timestamp
781+
(_, _, oldest_at) = self.in_flight_requests[0]
883782
timeout = self.config['request_timeout_ms'] / 1000.0
884783
if time.time() >= oldest_at + timeout:
885784
return True

kafka/errors.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ class NodeNotReadyError(KafkaError):
3333
retriable = True
3434

3535

36-
class CorrelationIdError(KafkaError):
36+
class KafkaProtocolError(KafkaError):
37+
retriable = True
38+
39+
40+
class CorrelationIdError(KafkaProtocolError):
3741
retriable = True
3842

3943

0 commit comments

Comments
 (0)