File tree Expand file tree Collapse file tree 1 file changed +10
-2
lines changed Expand file tree Collapse file tree 1 file changed +10
-2
lines changed Original file line number Diff line number Diff line change @@ -448,10 +448,16 @@ def task_done(self, message):
448448 message (KafkaMessage): the message to mark as complete
449449
450450 Returns:
451- Nothing
452-
451+ True, unless the topic-partition for this message has not
452+ been configured for the consumer. In normal operation, this
453+ should not happen. But see github issue 364.
453454 """
454455 topic_partition = (message .topic , message .partition )
456+ if topic_partition not in self ._topics :
457+ logger .warning ('Unrecognized topic/partition in task_done message: '
458+ '{0}:{1}' .format (* topic_partition ))
459+ return False
460+
455461 offset = message .offset
456462
457463 # Warn on non-contiguous offsets
@@ -476,6 +482,8 @@ def task_done(self, message):
476482 if self ._should_auto_commit ():
477483 self .commit ()
478484
485+ return True
486+
479487 def commit (self ):
480488 """Store consumed message offsets (marked via task_done())
481489 to kafka cluster for this consumer_group.
You can’t perform that action at this time.
0 commit comments