From 14c73cf9d3546bdfaa17fee0eda0b7bd904c2f18 Mon Sep 17 00:00:00 2001 From: Keith So Date: Wed, 20 Feb 2019 16:53:54 +0800 Subject: [PATCH 1/2] Keep track of last offset information if it is available --- kafka/consumer/fetcher.py | 19 +++++++++++++++++++ kafka/record/default_records.py | 4 ++++ kafka/version.py | 2 +- 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 36388319e..f1bce1919 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -119,6 +119,7 @@ def __init__(self, client, subscriptions, metrics, **configs): self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) self._isolation_level = READ_UNCOMMITTED + self._last_offset_from_batch = {} def send_fetches(self): """Send FetchRequests for all assigned partitions that do not already have @@ -447,6 +448,13 @@ def _unpack_message_set(self, tp, records): try: batch = records.next_batch() while batch is not None: + + # LegacyRecordBatch cannot access either base_offset or last_offset_delta + try: + self._last_offset_from_batch[tp] = batch.base_offset + batch.last_offset_delta + except AttributeError: + pass + for record in batch: key_size = len(record.key) if record.key is not None else -1 value_size = len(record.value) if record.value is not None else -1 @@ -651,6 +659,17 @@ def _create_fetch_requests(self): for partition in self._fetchable_partitions(): node_id = self._client.cluster.leader_for_partition(partition) + + # advance extra if there are compacted messages that are skipped that we know of from a message batch header + if partition in self._last_offset_from_batch: + if self._last_offset_from_batch[partition] + 1 > self._subscriptions.assignment[partition].position: + log.debug( + "Advance position for partition %s from %s to %s (last message batch location plus one)" + " to correct for deleted compacted messages", + partition, self._subscriptions.assignment[partition].position, + self._last_offset_from_batch[partition] + 1) + self._subscriptions.assignment[partition].position = self._last_offset_from_batch[partition] + 1 + position = self._subscriptions.assignment[partition].position # fetch if there is a leader and no in-flight requests diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 955e3ee2a..7f0e2b331 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -140,6 +140,10 @@ def crc(self): def attributes(self): return self._header_data[5] + @property + def last_offset_delta(self): + return self._header_data[6] + @property def compression_type(self): return self.attributes & self.CODEC_MASK diff --git a/kafka/version.py b/kafka/version.py index 9e0feee72..f6868c591 100644 --- a/kafka/version.py +++ b/kafka/version.py @@ -1 +1 @@ -__version__ = '1.4.4' +__version__ = '1.4.4et1' From b981c64cb1dc8741305275e9c9c1e70382d59693 Mon Sep 17 00:00:00 2001 From: Keith So Date: Fri, 22 Feb 2019 09:11:58 +0800 Subject: [PATCH 2/2] Reuse subscription state object --- kafka/consumer/fetcher.py | 16 ++++++++-------- kafka/consumer/subscription_state.py | 5 +++++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index f1bce1919..2b66be37f 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -119,7 +119,6 @@ def __init__(self, client, subscriptions, metrics, **configs): self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) self._isolation_level = READ_UNCOMMITTED - self._last_offset_from_batch = {} def send_fetches(self): """Send FetchRequests for all assigned partitions that do not already have @@ -451,7 +450,8 @@ def _unpack_message_set(self, tp, records): # LegacyRecordBatch cannot access either base_offset or last_offset_delta try: - self._last_offset_from_batch[tp] = batch.base_offset + batch.last_offset_delta + self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \ + batch.last_offset_delta except AttributeError: pass @@ -660,15 +660,15 @@ def _create_fetch_requests(self): for partition in self._fetchable_partitions(): node_id = self._client.cluster.leader_for_partition(partition) - # advance extra if there are compacted messages that are skipped that we know of from a message batch header - if partition in self._last_offset_from_batch: - if self._last_offset_from_batch[partition] + 1 > self._subscriptions.assignment[partition].position: + # advance position for any deleted compacted messages if required + if self._subscriptions.assignment[partition].last_offset_from_message_batch: + next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_message_batch + 1 + if next_offset_from_batch_header > self._subscriptions.assignment[partition].position: log.debug( "Advance position for partition %s from %s to %s (last message batch location plus one)" " to correct for deleted compacted messages", - partition, self._subscriptions.assignment[partition].position, - self._last_offset_from_batch[partition] + 1) - self._subscriptions.assignment[partition].position = self._last_offset_from_batch[partition] + 1 + partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header) + self._subscriptions.assignment[partition].position = next_offset_from_batch_header position = self._subscriptions.assignment[partition].position diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 4b0b275c1..ef501661a 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -382,6 +382,9 @@ def __init__(self): self._position = None # offset exposed to the user self.highwater = None self.drop_pending_message_set = False + # The last message offset hint available from a message batch with + # magic=2 which includes deleted compacted messages + self.last_offset_from_message_batch = None def _set_position(self, offset): assert self.has_valid_position, 'Valid position required' @@ -396,6 +399,7 @@ def await_reset(self, strategy): self.awaiting_reset = True self.reset_strategy = strategy self._position = None + self.last_offset_from_message_batch = None self.has_valid_position = False def seek(self, offset): @@ -404,6 +408,7 @@ def seek(self, offset): self.reset_strategy = None self.has_valid_position = True self.drop_pending_message_set = True + self.last_offset_from_message_batch = None def pause(self): self.paused = True