11import copy
2+ import logging
3+
24from collections import defaultdict
35from functools import partial
46from itertools import count
5- import logging
6- import time
77
8- from kafka .common import (
9- ErrorMapping , TopicAndPartition , ConnectionError ,
10- FailedPayloadsException
11- )
8+ from kafka .common import (ErrorMapping , TopicAndPartition ,
9+ ConnectionError , FailedPayloadsError ,
10+ BrokerResponseError , PartitionUnavailableError ,
11+ KafkaUnavailableError , KafkaRequestError )
12+
1213from kafka .conn import KafkaConnection
1314from kafka .protocol import KafkaProtocol
1415
@@ -29,8 +30,8 @@ def __init__(self, host, port, client_id=CLIENT_ID, timeout=10):
2930 }
3031 self .brokers = {} # broker_id -> BrokerMetadata
3132 self .topics_to_brokers = {} # topic_id -> broker_id
32- self .topic_partitions = defaultdict ( list ) # topic_id -> [0, 1, 2, ...]
33- self ._load_metadata_for_topics ()
33+ self .topic_partitions = {} # topic_id -> [0, 1, 2, ...]
34+ self .load_metadata_for_topics () # bootstrap with all metadata
3435
3536 ##################
3637 # Private API #
@@ -49,55 +50,13 @@ def _get_conn_for_broker(self, broker):
4950 def _get_leader_for_partition (self , topic , partition ):
5051 key = TopicAndPartition (topic , partition )
5152 if key not in self .topics_to_brokers :
52- self ._load_metadata_for_topics (topic )
53+ self .load_metadata_for_topics (topic )
5354
5455 if key not in self .topics_to_brokers :
55- raise Exception ("Partition does not exist: %s" % str (key ))
56+ raise KafkaRequestError ("Partition does not exist: %s" % str (key ))
5657
5758 return self .topics_to_brokers [key ]
5859
59- def _load_metadata_for_topics (self , * topics ):
60- """
61- Discover brokers and metadata for a set of topics. This method will
62- recurse in the event of a retry.
63- """
64- request_id = self ._next_id ()
65- request = KafkaProtocol .encode_metadata_request (self .client_id ,
66- request_id , topics )
67-
68- response = self ._send_broker_unaware_request (request_id , request )
69- if response is None :
70- raise Exception ("All servers failed to process request" )
71-
72- (brokers , topics ) = KafkaProtocol .decode_metadata_response (response )
73-
74- log .debug ("Broker metadata: %s" , brokers )
75- log .debug ("Topic metadata: %s" , topics )
76-
77- self .brokers = brokers
78- self .topics_to_brokers = {}
79-
80- for topic , partitions in topics .items ():
81- # Clear the list once before we add it. This removes stale entries
82- # and avoids duplicates
83- self .topic_partitions .pop (topic , None )
84-
85- if not partitions :
86- log .info ("Partition is unassigned, delay for 1s and retry" )
87- time .sleep (1 )
88- self ._load_metadata_for_topics (topic )
89- break
90-
91- for partition , meta in partitions .items ():
92- if meta .leader == - 1 :
93- log .info ("Partition is unassigned, delay for 1s and retry" )
94- time .sleep (1 )
95- self ._load_metadata_for_topics (topic )
96- else :
97- topic_part = TopicAndPartition (topic , partition )
98- self .topics_to_brokers [topic_part ] = brokers [meta .leader ]
99- self .topic_partitions [topic ].append (partition )
100-
10160 def _next_id (self ):
10261 """
10362 Generate a new correlation id
@@ -119,7 +78,7 @@ def _send_broker_unaware_request(self, requestId, request):
11978 "trying next server: %s" % (request , conn , e ))
12079 continue
12180
122- return None
81+ raise KafkaUnavailableError ( "All servers failed to process request" )
12382
12483 def _send_broker_aware_request (self , payloads , encoder_fn , decoder_fn ):
12584 """
@@ -150,6 +109,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
150109 for payload in payloads :
151110 leader = self ._get_leader_for_partition (payload .topic ,
152111 payload .partition )
112+ if leader == - 1 :
113+ raise PartitionUnavailableError ("Leader is unassigned for %s-%s" % payload .topic , payload .partition )
153114 payloads_by_broker [leader ].append (payload )
154115 original_keys .append ((payload .topic , payload .partition ))
155116
@@ -185,21 +146,51 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
185146
186147 if failed :
187148 failed_payloads += payloads
188- self .topics_to_brokers = {} # reset metadata
149+ self .reset_all_metadata ()
189150 continue
190151
191152 for response in decoder_fn (response ):
192153 acc [(response .topic , response .partition )] = response
193154
194155 if failed_payloads :
195- raise FailedPayloadsException (failed_payloads )
156+ raise FailedPayloadsError (failed_payloads )
196157
197158 # Order the accumulated responses by the original key order
198159 return (acc [k ] for k in original_keys ) if acc else ()
199160
161+ def _raise_on_response_error (self , resp ):
162+ if resp .error == ErrorMapping .NO_ERROR :
163+ return
164+
165+ if resp .error in (ErrorMapping .UNKNOWN_TOPIC_OR_PARTITON ,
166+ ErrorMapping .NOT_LEADER_FOR_PARTITION ):
167+ self .reset_topic_metadata (resp .topic )
168+
169+ raise BrokerResponseError (
170+ "Request for %s failed with errorcode=%d" %
171+ (TopicAndPartition (resp .topic , resp .partition ), resp .error ))
172+
200173 #################
201174 # Public API #
202175 #################
176+ def reset_topic_metadata (self , * topics ):
177+ for topic in topics :
178+ try :
179+ partitions = self .topic_partitions [topic ]
180+ except KeyError :
181+ continue
182+
183+ for partition in partitions :
184+ self .topics_to_brokers .pop (TopicAndPartition (topic , partition ), None )
185+
186+ del self .topic_partitions [topic ]
187+
188+ def reset_all_metadata (self ):
189+ self .topics_to_brokers .clear ()
190+ self .topic_partitions .clear ()
191+
192+ def has_metadata_for_topic (self , topic ):
193+ return topic in self .topic_partitions
203194
204195 def close (self ):
205196 for conn in self .conns .values ():
@@ -219,6 +210,36 @@ def reinit(self):
219210 for conn in self .conns .values ():
220211 conn .reinit ()
221212
213+ def load_metadata_for_topics (self , * topics ):
214+ """
215+ Discover brokers and metadata for a set of topics. This function is called
216+ lazily whenever metadata is unavailable.
217+ """
218+ request_id = self ._next_id ()
219+ request = KafkaProtocol .encode_metadata_request (self .client_id ,
220+ request_id , topics )
221+
222+ response = self ._send_broker_unaware_request (request_id , request )
223+
224+ (brokers , topics ) = KafkaProtocol .decode_metadata_response (response )
225+
226+ log .debug ("Broker metadata: %s" , brokers )
227+ log .debug ("Topic metadata: %s" , topics )
228+
229+ self .brokers = brokers
230+
231+ for topic , partitions in topics .items ():
232+ self .reset_topic_metadata (topic )
233+
234+ if not partitions :
235+ continue
236+
237+ self .topic_partitions [topic ] = []
238+ for partition , meta in partitions .items ():
239+ topic_part = TopicAndPartition (topic , partition )
240+ self .topics_to_brokers [topic_part ] = brokers [meta .leader ]
241+ self .topic_partitions [topic ].append (partition )
242+
222243 def send_produce_request (self , payloads = [], acks = 1 , timeout = 1000 ,
223244 fail_on_error = True , callback = None ):
224245 """
@@ -256,14 +277,9 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,
256277
257278 out = []
258279 for resp in resps :
259- # Check for errors
260- if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
261- raise Exception (
262- "ProduceRequest for %s failed with errorcode=%d" %
263- (TopicAndPartition (resp .topic , resp .partition ),
264- resp .error ))
265-
266- # Run the callback
280+ if fail_on_error is True :
281+ self ._raise_on_response_error (resp )
282+
267283 if callback is not None :
268284 out .append (callback (resp ))
269285 else :
@@ -289,14 +305,9 @@ def send_fetch_request(self, payloads=[], fail_on_error=True,
289305
290306 out = []
291307 for resp in resps :
292- # Check for errors
293- if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
294- raise Exception (
295- "FetchRequest for %s failed with errorcode=%d" %
296- (TopicAndPartition (resp .topic , resp .partition ),
297- resp .error ))
298-
299- # Run the callback
308+ if fail_on_error is True :
309+ self ._raise_on_response_error (resp )
310+
300311 if callback is not None :
301312 out .append (callback (resp ))
302313 else :
@@ -312,9 +323,8 @@ def send_offset_request(self, payloads=[], fail_on_error=True,
312323
313324 out = []
314325 for resp in resps :
315- if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
316- raise Exception ("OffsetRequest failed with errorcode=%s" ,
317- resp .error )
326+ if fail_on_error is True :
327+ self ._raise_on_response_error (resp )
318328 if callback is not None :
319329 out .append (callback (resp ))
320330 else :
@@ -330,9 +340,8 @@ def send_offset_commit_request(self, group, payloads=[],
330340
331341 out = []
332342 for resp in resps :
333- if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
334- raise Exception ("OffsetCommitRequest failed with "
335- "errorcode=%s" , resp .error )
343+ if fail_on_error is True :
344+ self ._raise_on_response_error (resp )
336345
337346 if callback is not None :
338347 out .append (callback (resp ))
@@ -350,9 +359,8 @@ def send_offset_fetch_request(self, group, payloads=[],
350359
351360 out = []
352361 for resp in resps :
353- if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
354- raise Exception ("OffsetCommitRequest failed with errorcode=%s" ,
355- resp .error )
362+ if fail_on_error is True :
363+ self ._raise_on_response_error (resp )
356364 if callback is not None :
357365 out .append (callback (resp ))
358366 else :
0 commit comments