2222
2323class KafkaClient (object ):
2424
25- CLIENT_ID = b" kafka-python"
25+ CLIENT_ID = b' kafka-python'
2626
2727 # NOTE: The timeout given to the client should always be greater than the
2828 # one passed to SimpleConsumer.get_message(), otherwise you can get a
@@ -50,7 +50,7 @@ def __init__(self, hosts, client_id=CLIENT_ID,
5050 ##################
5151
5252 def _get_conn (self , host , port ):
53- "Get or create a connection to a broker using host and port"
53+ """ Get or create a connection to a broker using host and port"" "
5454 host_key = (host , port )
5555 if host_key not in self .conns :
5656 self .conns [host_key ] = KafkaConnection (
@@ -122,10 +122,10 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
122122 return decoder_fn (response )
123123
124124 except Exception :
125- log .exception ("Could not send request [%r ] to server %s:%i, "
126- " trying next server" % ( requestId , host , port ) )
125+ log .exception ('Error sending request [%s ] to server %s:%s, '
126+ ' trying next server' , requestId , host , port )
127127
128- raise KafkaUnavailableError (" All servers failed to process request" )
128+ raise KafkaUnavailableError (' All servers failed to process request' )
129129
130130 def _send_broker_aware_request (self , payloads , encoder_fn , decoder_fn ):
131131 """
@@ -180,7 +180,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
180180
181181 except ConnectionError as e :
182182 broker_failures .append (broker )
183- log .warning (" Could not send request [%s] to server %s: %s" ,
183+ log .warning (' Could not send request [%s] to server %s: %s' ,
184184 binascii .b2a_hex (request ), broker , e )
185185
186186 for payload in payloads :
@@ -201,15 +201,14 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
201201 response = conn .recv (requestId )
202202 except ConnectionError as e :
203203 broker_failures .append (broker )
204- log .warning (" Could not receive response to request [%s] "
205- " from server %s: %s" ,
204+ log .warning (' Could not receive response to request [%s] '
205+ ' from server %s: %s' ,
206206 binascii .b2a_hex (request ), conn , e )
207207
208208 for payload in payloads :
209209 responses_by_broker [broker ].append (FailedPayloadsError (payload ))
210210
211211 else :
212-
213212 for payload_response in decoder_fn (response ):
214213 responses_by_broker [broker ].append (payload_response )
215214
@@ -300,7 +299,7 @@ def ensure_topic_exists(self, topic, timeout = 30):
300299
301300 while not self .has_metadata_for_topic (topic ):
302301 if time .time () > start_time + timeout :
303- raise KafkaTimeoutError (" Unable to create topic {0}" .format (topic ))
302+ raise KafkaTimeoutError (' Unable to create topic {0}' .format (topic ))
304303 try :
305304 self .load_metadata_for_topics (topic )
306305 except LeaderNotAvailableError :
@@ -348,8 +347,8 @@ def load_metadata_for_topics(self, *topics):
348347
349348 resp = self .send_metadata_request (topics )
350349
351- log .debug (" Received new broker metadata: %s" , resp .brokers )
352- log .debug (" Received new topic metadata: %s" , resp .topics )
350+ log .debug (' Received new broker metadata: %s' , resp .brokers )
351+ log .debug (' Received new topic metadata: %s' , resp .topics )
353352
354353 self .brokers = dict ([(broker .nodeId , broker )
355354 for broker in resp .brokers ])
@@ -368,7 +367,7 @@ def load_metadata_for_topics(self, *topics):
368367 raise
369368
370369 # Otherwise, just log a warning
371- log .error (" Error loading topic metadata for %s: %s" , topic , type (e ))
370+ log .error (' Error loading topic metadata for %s: %s' , topic , type (e ))
372371 continue
373372
374373 self .topic_partitions [topic ] = {}
@@ -409,7 +408,6 @@ def load_metadata_for_topics(self, *topics):
409408
410409 def send_metadata_request (self , payloads = [], fail_on_error = True ,
411410 callback = None ):
412-
413411 encoder = KafkaProtocol .encode_metadata_request
414412 decoder = KafkaProtocol .decode_metadata_response
415413
0 commit comments