Skip to content

Commit a6130d2

Browse files
authored
Use local copies in Fetcher._fetchable_partitions to avoid mutation errors (#1400)
1 parent ff13f87 commit a6130d2

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

kafka/consumer/fetcher.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -626,9 +626,12 @@ def _handle_offset_response(self, future, response):
626626

627627
def _fetchable_partitions(self):
628628
fetchable = self._subscriptions.fetchable_partitions()
629-
if self._next_partition_records:
630-
fetchable.discard(self._next_partition_records.topic_partition)
631-
for fetch in self._completed_fetches:
629+
# do not fetch a partition if we have a pending fetch response to process
630+
current = self._next_partition_records
631+
pending = copy.copy(self._completed_fetches)
632+
if current:
633+
fetchable.discard(current.topic_partition)
634+
for fetch in pending:
632635
fetchable.discard(fetch.topic_partition)
633636
return fetchable
634637

0 commit comments

Comments
 (0)