diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c1eb03ef6..36e269f19 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -439,6 +439,14 @@ def _unpack_message_set(self, tp, records): try: batch = records.next_batch() while batch is not None: + + # LegacyRecordBatch cannot access either base_offset or last_offset_delta + try: + self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \ + batch.last_offset_delta + except AttributeError: + pass + for record in batch: key_size = len(record.key) if record.key is not None else -1 value_size = len(record.value) if record.value is not None else -1 @@ -643,6 +651,17 @@ def _create_fetch_requests(self): for partition in self._fetchable_partitions(): node_id = self._client.cluster.leader_for_partition(partition) + + # advance position for any deleted compacted messages if required + if self._subscriptions.assignment[partition].last_offset_from_message_batch: + next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_message_batch + 1 + if next_offset_from_batch_header > self._subscriptions.assignment[partition].position: + log.debug( + "Advance position for partition %s from %s to %s (last message batch location plus one)" + " to correct for deleted compacted messages", + partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header) + self._subscriptions.assignment[partition].position = next_offset_from_batch_header + position = self._subscriptions.assignment[partition].position # fetch if there is a leader and no in-flight requests diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 4b0b275c1..ef501661a 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -382,6 +382,9 @@ def __init__(self): self._position = None # offset exposed to the user self.highwater = None self.drop_pending_message_set = False + # The last message offset hint available from a message batch with + # magic=2 which includes deleted compacted messages + self.last_offset_from_message_batch = None def _set_position(self, offset): assert self.has_valid_position, 'Valid position required' @@ -396,6 +399,7 @@ def await_reset(self, strategy): self.awaiting_reset = True self.reset_strategy = strategy self._position = None + self.last_offset_from_message_batch = None self.has_valid_position = False def seek(self, offset): @@ -404,6 +408,7 @@ def seek(self, offset): self.reset_strategy = None self.has_valid_position = True self.drop_pending_message_set = True + self.last_offset_from_message_batch = None def pause(self): self.paused = True diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 955e3ee2a..7f0e2b331 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -140,6 +140,10 @@ def crc(self): def attributes(self): return self._header_data[5] + @property + def last_offset_delta(self): + return self._header_data[6] + @property def compression_type(self): return self.attributes & self.CODEC_MASK