Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 61 additions & 48 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,10 @@ def _message_generator(self):

tp = self._next_partition_records.topic_partition

# We can ignore any prior signal to drop pending message sets
# We can ignore any prior signal to drop pending record batches
# because we are starting from a fresh one where fetch_offset == position
# i.e., the user seek()'d to this position
self._subscriptions.assignment[tp].drop_pending_message_set = False
self._subscriptions.assignment[tp].drop_pending_record_batch = False

for msg in self._next_partition_records.take():

Expand All @@ -430,12 +430,12 @@ def _message_generator(self):
break

# If there is a seek during message iteration,
# we should stop unpacking this message set and
# we should stop unpacking this record batch and
# wait for a new fetch response that aligns with the
# new seek position
elif self._subscriptions.assignment[tp].drop_pending_message_set:
log.debug("Skipping remainder of message set for partition %s", tp)
self._subscriptions.assignment[tp].drop_pending_message_set = False
elif self._subscriptions.assignment[tp].drop_pending_record_batch:
log.debug("Skipping remainder of record batch for partition %s", tp)
self._subscriptions.assignment[tp].drop_pending_record_batch = False
self._next_partition_records = None
break

Expand All @@ -451,16 +451,16 @@ def _message_generator(self):

self._next_partition_records = None

def _unpack_message_set(self, tp, records):
def _unpack_records(self, tp, records):
try:
batch = records.next_batch()
while batch is not None:

# Try DefaultsRecordBatch / message log format v2
# base_offset, last_offset_delta, and control batches
try:
self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \
batch.last_offset_delta
self._subscriptions.assignment[tp].last_offset_from_record_batch = batch.base_offset + \
batch.last_offset_delta
# Control batches have a single record indicating whether a transaction
# was aborted or committed.
# When isolation_level is READ_COMMITTED (currently unsupported)
Expand Down Expand Up @@ -673,17 +673,18 @@ def _create_fetch_requests(self):
"""
# create the fetch info as a dict of lists of partition info tuples
# which can be passed to FetchRequest() via .items()
version = self._client.api_version(FetchRequest, max_version=6)
fetchable = collections.defaultdict(lambda: collections.defaultdict(list))

for partition in self._fetchable_partitions():
node_id = self._client.cluster.leader_for_partition(partition)

# advance position for any deleted compacted messages if required
if self._subscriptions.assignment[partition].last_offset_from_message_batch:
next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_message_batch + 1
if self._subscriptions.assignment[partition].last_offset_from_record_batch:
next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_record_batch + 1
if next_offset_from_batch_header > self._subscriptions.assignment[partition].position:
log.debug(
"Advance position for partition %s from %s to %s (last message batch location plus one)"
"Advance position for partition %s from %s to %s (last record batch location plus one)"
" to correct for deleted compacted messages and/or transactional control records",
partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header)
self._subscriptions.assignment[partition].position = next_offset_from_batch_header
Expand All @@ -697,52 +698,60 @@ def _create_fetch_requests(self):
self._client.cluster.request_update()

elif self._client.in_flight_request_count(node_id) == 0:
partition_info = (
partition.partition,
position,
self.config['max_partition_fetch_bytes']
)
if version < 5:
partition_info = (
partition.partition,
position,
self.config['max_partition_fetch_bytes']
)
else:
partition_info = (
partition.partition,
position,
-1, # log_start_offset is used internally by brokers / replicas only
self.config['max_partition_fetch_bytes'],
)
fetchable[node_id][partition.topic].append(partition_info)
log.debug("Adding fetch request for partition %s at offset %d",
partition, position)
else:
log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s",
partition, node_id)

version = self._client.api_version(FetchRequest, max_version=4)
requests = {}
for node_id, partition_data in six.iteritems(fetchable):
if version < 3:
# As of version == 3 partitions will be returned in order as
# they are requested, so to avoid starvation with
# `fetch_max_bytes` option we need this shuffle
# NOTE: we do have partition_data in random order due to usage
# of unordered structures like dicts, but that does not
# guarantee equal distribution, and starting in Python3.6
# dicts retain insert order.
partition_data = list(partition_data.items())
random.shuffle(partition_data)

if version <= 2:
requests[node_id] = FetchRequest[version](
-1, # replica_id
self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'],
partition_data)
elif version == 3:
requests[node_id] = FetchRequest[version](
-1, # replica_id
self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'],
partition_data.items())
self.config['fetch_max_bytes'],
partition_data)
else:
# As of version == 3 partitions will be returned in order as
# they are requested, so to avoid starvation with
# `fetch_max_bytes` option we need this shuffle
# NOTE: we do have partition_data in random order due to usage
# of unordered structures like dicts, but that does not
# guarantee equal distribution, and starting in Python3.6
# dicts retain insert order.
partition_data = list(partition_data.items())
random.shuffle(partition_data)
if version == 3:
requests[node_id] = FetchRequest[version](
-1, # replica_id
self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'],
self.config['fetch_max_bytes'],
partition_data)
else:
requests[node_id] = FetchRequest[version](
-1, # replica_id
self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'],
self.config['fetch_max_bytes'],
self._isolation_level,
partition_data)
# through v6
requests[node_id] = FetchRequest[version](
-1, # replica_id
self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'],
self.config['fetch_max_bytes'],
self._isolation_level,
partition_data)
return requests

def _handle_fetch_response(self, request, send_time, response):
Expand Down Expand Up @@ -821,7 +830,7 @@ def _parse_fetched_data(self, completed_fetch):
log.debug("Adding fetched record for partition %s with"
" offset %d to buffered record list", tp,
position)
unpacked = list(self._unpack_message_set(tp, records))
unpacked = list(self._unpack_records(tp, records))
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
if unpacked:
last_offset = unpacked[-1].offset
Expand All @@ -845,7 +854,9 @@ def _parse_fetched_data(self, completed_fetch):
self._sensors.record_topic_fetch_metrics(tp.topic, num_bytes, records_count)

elif error_type in (Errors.NotLeaderForPartitionError,
Errors.UnknownTopicOrPartitionError):
Errors.UnknownTopicOrPartitionError,
Errors.KafkaStorageError):
log.debug("Error fetching partition %s: %s", tp, error_type.__name__)
self._client.cluster.request_update()
elif error_type is Errors.OffsetOutOfRangeError:
position = self._subscriptions.assignment[tp].position
Expand All @@ -862,8 +873,10 @@ def _parse_fetched_data(self, completed_fetch):
elif error_type is Errors.TopicAuthorizationFailedError:
log.warning("Not authorized to read from topic %s.", tp.topic)
raise Errors.TopicAuthorizationFailedError(set(tp.topic))
elif error_type is Errors.UnknownError:
log.warning("Unknown error fetching data for topic-partition %s", tp)
elif error_type.is_retriable:
log.debug("Retriable error fetching partition %s: %s", tp, error_type())
if error_type.invalid_metadata:
self._client.cluster.request_update()
else:
raise error_type('Unexpected error while fetching data')

Expand Down
12 changes: 6 additions & 6 deletions kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,10 @@ def __init__(self):
self.reset_strategy = None # the reset strategy if awaitingReset is set
self._position = None # offset exposed to the user
self.highwater = None
self.drop_pending_message_set = False
# The last message offset hint available from a message batch with
self.drop_pending_record_batch = False
# The last message offset hint available from a record batch with
# magic=2 which includes deleted compacted messages
self.last_offset_from_message_batch = None
self.last_offset_from_record_batch = None

def _set_position(self, offset):
assert self.has_valid_position, 'Valid position required'
Expand All @@ -399,16 +399,16 @@ def await_reset(self, strategy):
self.awaiting_reset = True
self.reset_strategy = strategy
self._position = None
self.last_offset_from_message_batch = None
self.last_offset_from_record_batch = None
self.has_valid_position = False

def seek(self, offset):
self._position = offset
self.awaiting_reset = False
self.reset_strategy = None
self.has_valid_position = True
self.drop_pending_message_set = True
self.last_offset_from_message_batch = None
self.drop_pending_record_batch = True
self.last_offset_from_record_batch = None

def pause(self):
self.paused = True
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class RecordAccumulator(object):
will also impact the compression ratio (more batching means better
compression). Default: None.
linger_ms (int): An artificial delay time to add before declaring a
messageset (that isn't full) ready for sending. This allows
record batch (that isn't full) ready for sending. This allows
time for more records to arrive. Setting a non-zero linger_ms
will trade off some latency for potentially better throughput
due to more batching (and hence fewer, larger requests).
Expand Down
16 changes: 9 additions & 7 deletions kafka/protocol/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class FetchResponse_v0(Response):
('partition', Int32),
('error_code', Int16),
('highwater_offset', Int64),
('message_set', Bytes)))))
('records', Bytes)))))
)


Expand All @@ -29,7 +29,7 @@ class FetchResponse_v1(Response):
('partition', Int32),
('error_code', Int16),
('highwater_offset', Int64),
('message_set', Bytes)))))
('records', Bytes)))))
)


Expand All @@ -46,6 +46,7 @@ class FetchResponse_v3(Response):


class FetchResponse_v4(Response):
# Adds message format v2
API_KEY = 1
API_VERSION = 4
SCHEMA = Schema(
Expand All @@ -60,7 +61,7 @@ class FetchResponse_v4(Response):
('aborted_transactions', Array(
('producer_id', Int64),
('first_offset', Int64))),
('message_set', Bytes)))))
('records', Bytes)))))
)


Expand All @@ -80,7 +81,7 @@ class FetchResponse_v5(Response):
('aborted_transactions', Array(
('producer_id', Int64),
('first_offset', Int64))),
('message_set', Bytes)))))
('records', Bytes)))))
)


Expand Down Expand Up @@ -115,7 +116,7 @@ class FetchResponse_v7(Response):
('aborted_transactions', Array(
('producer_id', Int64),
('first_offset', Int64))),
('message_set', Bytes)))))
('records', Bytes)))))
)


Expand Down Expand Up @@ -156,7 +157,7 @@ class FetchResponse_v11(Response):
('producer_id', Int64),
('first_offset', Int64))),
('preferred_read_replica', Int32),
('message_set', Bytes)))))
('records', Bytes)))))
)


Expand Down Expand Up @@ -211,6 +212,7 @@ class FetchRequest_v3(Request):

class FetchRequest_v4(Request):
# Adds isolation_level field
# Adds message format v2
API_KEY = 1
API_VERSION = 4
RESPONSE_TYPE = FetchResponse_v4
Expand Down Expand Up @@ -264,7 +266,7 @@ class FetchRequest_v6(Request):

class FetchRequest_v7(Request):
"""
Add incremental fetch requests
Add incremental fetch requests (see KIP-227)
"""
API_KEY = 1
API_VERSION = 7
Expand Down
6 changes: 4 additions & 2 deletions kafka/protocol/produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ProduceResponse_v2(Response):


class ProduceResponse_v3(Response):
# Adds support for message format v2
API_KEY = 0
API_VERSION = 3
SCHEMA = ProduceResponse_v2.SCHEMA
Expand Down Expand Up @@ -141,7 +142,7 @@ class ProduceRequest_v0(ProduceRequest):
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('messages', Bytes)))))
('records', Bytes)))))
)


Expand All @@ -158,6 +159,7 @@ class ProduceRequest_v2(ProduceRequest):


class ProduceRequest_v3(ProduceRequest):
# Adds support for message format v2
API_VERSION = 3
RESPONSE_TYPE = ProduceResponse_v3
SCHEMA = Schema(
Expand All @@ -168,7 +170,7 @@ class ProduceRequest_v3(ProduceRequest):
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('messages', Bytes)))))
('records', Bytes)))))
)


Expand Down
4 changes: 2 additions & 2 deletions test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ def test__handle_fetch_error(fetcher, caplog, exception, log_level):
assert caplog.records[0].levelname == logging.getLevelName(log_level)


def test__unpack_message_set(fetcher):
def test__unpack_records(fetcher):
fetcher.config['check_crcs'] = False
tp = TopicPartition('foo', 0)
messages = [
Expand All @@ -408,7 +408,7 @@ def test__unpack_message_set(fetcher):
(None, b"c", None),
]
memory_records = MemoryRecords(_build_record_batch(messages))
records = list(fetcher._unpack_message_set(tp, memory_records))
records = list(fetcher._unpack_records(tp, memory_records))
assert len(records) == 3
assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
assert records[0].value == b'a'
Expand Down