Skip to content

Commit 04a3816

Browse files
committed
Merge pull request #365 from dpkp/kafka_consumer_fixup
KafkaConsumer topic/partition fixes
2 parents 46ee816 + c0fc334 commit 04a3816

File tree

1 file changed

+32
-28
lines changed

1 file changed

+32
-28
lines changed

kafka/consumer/kafka.py

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,10 @@ def set_topic_partitions(self, *topics):
194194
elif isinstance(arg, tuple):
195195
topic = kafka_bytestring(arg[0])
196196
partition = arg[1]
197+
self._consume_topic_partition(topic, partition)
197198
if len(arg) == 3:
198199
offset = arg[2]
199200
self._offsets.fetch[(topic, partition)] = offset
200-
self._consume_topic_partition(topic, partition)
201201

202202
# { topic: partitions, ... } dict
203203
elif isinstance(arg, dict):
@@ -224,7 +224,7 @@ def set_topic_partitions(self, *topics):
224224
topic = kafka_bytestring(key[0])
225225
partition = key[1]
226226
self._consume_topic_partition(topic, partition)
227-
self._offsets.fetch[key] = value
227+
self._offsets.fetch[(topic, partition)] = value
228228

229229
else:
230230
raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg))
@@ -312,16 +312,16 @@ def fetch_messages(self):
312312
max_wait_time = self._config['fetch_wait_max_ms']
313313
min_bytes = self._config['fetch_min_bytes']
314314

315-
# Get current fetch offsets
316-
offsets = self._offsets.fetch
317-
if not offsets:
318-
if not self._topics:
319-
raise KafkaConfigurationError('No topics or partitions configured')
315+
if not self._topics:
316+
raise KafkaConfigurationError('No topics or partitions configured')
317+
318+
if not self._offsets.fetch:
320319
raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages')
321320

322-
fetches = []
323-
for topic_partition, offset in six.iteritems(offsets):
324-
fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes))
321+
fetches = [FetchRequest(topic, partition,
322+
self._offsets.fetch[(topic, partition)],
323+
max_bytes)
324+
for (topic, partition) in self._topics]
325325

326326
# client.send_fetch_request will collect topic/partition requests by leader
327327
# and send each group as a single FetchRequest to the correct broker
@@ -336,49 +336,53 @@ def fetch_messages(self):
336336
return
337337

338338
for resp in responses:
339-
topic_partition = (resp.topic, resp.partition)
339+
topic = kafka_bytestring(resp.topic)
340+
partition = resp.partition
340341
try:
341342
check_error(resp)
342343
except OffsetOutOfRangeError:
343-
logger.warning('OffsetOutOfRange: topic %s, partition %d, offset %d '
344-
'(Highwatermark: %d)',
345-
resp.topic, resp.partition,
346-
offsets[topic_partition], resp.highwaterMark)
344+
logger.warning('OffsetOutOfRange: topic %s, partition %d, '
345+
'offset %d (Highwatermark: %d)',
346+
topic, partition,
347+
self.offsets._fetch[(topic, partition)],
348+
resp.highwaterMark)
347349
# Reset offset
348-
self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
350+
self._offsets.fetch[(topic, partition)] = (
351+
self._reset_partition_offset((topic, partition))
352+
)
349353
continue
350354

351355
except NotLeaderForPartitionError:
352356
logger.warning("NotLeaderForPartitionError for %s - %d. "
353357
"Metadata may be out of date",
354-
resp.topic, resp.partition)
358+
topic, partition)
355359
self._refresh_metadata_on_error()
356360
continue
357361

358362
except RequestTimedOutError:
359363
logger.warning("RequestTimedOutError for %s - %d",
360-
resp.topic, resp.partition)
364+
topic, partition)
361365
continue
362366

363367
# Track server highwater mark
364-
self._offsets.highwater[topic_partition] = resp.highwaterMark
368+
self._offsets.highwater[(topic, partition)] = resp.highwaterMark
365369

366370
# Yield each message
367371
# Kafka-python could raise an exception during iteration
368372
# we are not catching -- user will need to address
369373
for (offset, message) in resp.messages:
370374
# deserializer_class could raise an exception here
371-
msg = KafkaMessage(resp.topic,
372-
resp.partition,
373-
offset, message.key,
374-
self._config['deserializer_class'](message.value))
375-
376-
if offset < self._offsets.fetch[topic_partition]:
377-
logger.debug('Skipping message %s because its offset is less than the consumer offset',
378-
msg)
375+
val = self._config['deserializer_class'](message.value)
376+
msg = KafkaMessage(topic, partition, offset, message.key, val)
377+
378+
# in some cases the server will return earlier messages
379+
# than we requested. skip them per kafka spec
380+
if offset < self._offsets.fetch[(topic, partition)]:
381+
logger.debug('message offset less than fetched offset '
382+
'skipping: %s', msg)
379383
continue
380384
# Only increment fetch offset if we safely got the message and deserialized
381-
self._offsets.fetch[topic_partition] = offset + 1
385+
self._offsets.fetch[(topic, partition)] = offset + 1
382386

383387
# Then yield to user
384388
yield msg

0 commit comments

Comments
 (0)