44import struct
55from threading import local
66
7- from kafka .common import BufferUnderflowError
87from kafka .common import ConnectionError
98
109log = logging .getLogger ("kafka" )
@@ -19,14 +18,14 @@ class KafkaConnection(local):
1918 we can do something in here to facilitate multiplexed requests/responses
2019 since the Kafka API includes a correlation id.
2120 """
22- def __init__ (self , host , port , bufsize = 4096 ):
21+ def __init__ (self , host , port , timeout = 10 ):
2322 super (KafkaConnection , self ).__init__ ()
2423 self .host = host
2524 self .port = port
26- self .bufsize = bufsize
2725 self ._sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
2826 self ._sock .connect ((host , port ))
29- self ._sock .settimeout (10 )
27+ self .timeout = timeout
28+ self ._sock .settimeout (self .timeout )
3029 self ._dirty = False
3130
3231 def __str__ (self ):
@@ -36,44 +35,31 @@ def __str__(self):
3635 # Private API #
3736 ###################
3837
39- def _consume_response (self ):
40- """
41- Fully consume the response iterator
42- """
43- return "" .join (self ._consume_response_iter ())
44-
45- def _consume_response_iter (self ):
46- """
47- This method handles the response header and error messages. It
48- then returns an iterator for the chunks of the response
49- """
50- log .debug ("Handling response from Kafka" )
51-
52- # Read the size off of the header
53- resp = self ._sock .recv (4 )
54- if resp == "" :
55- self ._raise_connection_error ()
56- (size ,) = struct .unpack ('>i' , resp )
57-
58- messagesize = size - 4
59- log .debug ("About to read %d bytes from Kafka" , messagesize )
60-
61- # Read the remainder of the response
62- total = 0
63- while total < messagesize :
64- resp = self ._sock .recv (self .bufsize )
65- log .debug ("Read %d bytes from Kafka" , len (resp ))
66- if resp == "" :
67- raise BufferUnderflowError (
68- "Not enough data to read this response" )
69-
70- total += len (resp )
71- yield resp
72-
7338 def _raise_connection_error (self ):
7439 self ._dirty = True
7540 raise ConnectionError ("Kafka @ {}:{} went away" .format (self .host , self .port ))
7641
42+ def _read_bytes (self , num_bytes ):
43+ bytes_left = num_bytes
44+ resp = ''
45+ log .debug ("About to read %d bytes from Kafka" , num_bytes )
46+ if self ._dirty :
47+ self .reinit ()
48+ while bytes_left :
49+ try :
50+ data = self ._sock .recv (bytes_left )
51+ except socket .error :
52+ log .exception ('Unable to receive data from Kafka' )
53+ self ._raise_connection_error ()
54+ if data == '' :
55+ log .error ("Not enough data to read this response" )
56+ self ._raise_connection_error ()
57+ bytes_left -= len (data )
58+ log .debug ("Read %d/%d bytes from Kafka" , num_bytes - bytes_left , num_bytes )
59+ resp += data
60+
61+ return resp
62+
7763 ##################
7864 # Public API #
7965 ##################
@@ -89,7 +75,7 @@ def send(self, request_id, payload):
8975 sent = self ._sock .sendall (payload )
9076 if sent is not None :
9177 self ._raise_connection_error ()
92- except socket .error :
78+ except socket .error , e :
9379 log .exception ('Unable to send payload to Kafka' )
9480 self ._raise_connection_error ()
9581
@@ -98,8 +84,14 @@ def recv(self, request_id):
9884 Get a response from Kafka
9985 """
10086 log .debug ("Reading response %d from Kafka" % request_id )
101- self .data = self ._consume_response ()
102- return self .data
87+ # Read the size off of the header
88+ resp = self ._read_bytes (4 )
89+
90+ (size ,) = struct .unpack ('>i' , resp )
91+
92+ # Read the remainder of the response
93+ resp = self ._read_bytes (size )
94+ return str (resp )
10395
10496 def copy (self ):
10597 """
@@ -124,5 +116,5 @@ def reinit(self):
124116 self .close ()
125117 self ._sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
126118 self ._sock .connect ((self .host , self .port ))
127- self ._sock .settimeout (10 )
119+ self ._sock .settimeout (self . timeout )
128120 self ._dirty = False
0 commit comments