Skip to content

Commit 4cadaaf

Browse files
authored
Fix KafkaConsumer compacted offset handling (#1397)
1 parent e66d8c4 commit 4cadaaf

File tree

2 files changed

+28
-10
lines changed

2 files changed

+28
-10
lines changed

kafka/consumer/fetcher.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -372,11 +372,6 @@ def _append(self, drained, part, max_records):
372372
tp, next_offset)
373373

374374
for record in part_records:
375-
# Fetched compressed messages may include additional records
376-
if record.offset < fetch_offset:
377-
log.debug("Skipping message offset: %s (expecting %s)",
378-
record.offset, fetch_offset)
379-
continue
380375
drained[tp].append(record)
381376

382377
self._subscriptions.assignment[tp].position = next_offset
@@ -843,10 +838,15 @@ def __init__(self, fetch_offset, tp, messages):
843838
# When fetching an offset that is in the middle of a
844839
# compressed batch, we will get all messages in the batch.
845840
# But we want to start 'take' at the fetch_offset
841+
# (or the next highest offset in case the message was compacted)
846842
for i, msg in enumerate(messages):
847-
if msg.offset == fetch_offset:
843+
if msg.offset < fetch_offset:
844+
log.debug("Skipping message offset: %s (expecting %s)",
845+
msg.offset, fetch_offset)
846+
else:
848847
self.message_idx = i
849848
break
849+
850850
else:
851851
self.message_idx = 0
852852
self.messages = None
@@ -868,8 +868,9 @@ def take(self, n=None):
868868
next_idx = self.message_idx + n
869869
res = self.messages[self.message_idx:next_idx]
870870
self.message_idx = next_idx
871-
if len(self) > 0:
872-
self.fetch_offset = self.messages[self.message_idx].offset
871+
# fetch_offset should be incremented by 1 to parallel the
872+
# subscription position (also incremented by 1)
873+
self.fetch_offset = max(self.fetch_offset, res[-1].offset + 1)
873874
return res
874875

875876

test/test_fetcher.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -514,8 +514,8 @@ def test_partition_records_offset():
514514
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
515515
assert len(records) > 0
516516
msgs = records.take(1)
517-
assert msgs[0].offset == 123
518-
assert records.fetch_offset == 124
517+
assert msgs[0].offset == fetch_offset
518+
assert records.fetch_offset == fetch_offset + 1
519519
msgs = records.take(2)
520520
assert len(msgs) == 2
521521
assert len(records) > 0
@@ -538,3 +538,20 @@ def test_partition_records_no_fetch_offset():
538538
for i in range(batch_start, batch_end)]
539539
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
540540
assert len(records) == 0
541+
542+
543+
def test_partition_records_compacted_offset():
544+
"""Test that messagesets are handle correctly
545+
when the fetch offset points to a message that has been compacted
546+
"""
547+
batch_start = 0
548+
batch_end = 100
549+
fetch_offset = 42
550+
tp = TopicPartition('foo', 0)
551+
messages = [ConsumerRecord(tp.topic, tp.partition, i,
552+
None, None, 'key', 'value', 'checksum', 0, 0)
553+
for i in range(batch_start, batch_end) if i != fetch_offset]
554+
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
555+
assert len(records) == batch_end - fetch_offset - 1
556+
msgs = records.take(1)
557+
assert msgs[0].offset == fetch_offset + 1

0 commit comments

Comments
 (0)