diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 9dd4b84c9..512d56dc3 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -411,10 +411,10 @@ def _message_generator(self): tp = self._next_partition_records.topic_partition - # We can ignore any prior signal to drop pending message sets + # We can ignore any prior signal to drop pending record batches # because we are starting from a fresh one where fetch_offset == position # i.e., the user seek()'d to this position - self._subscriptions.assignment[tp].drop_pending_message_set = False + self._subscriptions.assignment[tp].drop_pending_record_batch = False for msg in self._next_partition_records.take(): @@ -430,12 +430,12 @@ def _message_generator(self): break # If there is a seek during message iteration, - # we should stop unpacking this message set and + # we should stop unpacking this record batch and # wait for a new fetch response that aligns with the # new seek position - elif self._subscriptions.assignment[tp].drop_pending_message_set: - log.debug("Skipping remainder of message set for partition %s", tp) - self._subscriptions.assignment[tp].drop_pending_message_set = False + elif self._subscriptions.assignment[tp].drop_pending_record_batch: + log.debug("Skipping remainder of record batch for partition %s", tp) + self._subscriptions.assignment[tp].drop_pending_record_batch = False self._next_partition_records = None break @@ -451,7 +451,7 @@ def _message_generator(self): self._next_partition_records = None - def _unpack_message_set(self, tp, records): + def _unpack_records(self, tp, records): try: batch = records.next_batch() while batch is not None: @@ -459,8 +459,8 @@ def _unpack_message_set(self, tp, records): # Try DefaultsRecordBatch / message log format v2 # base_offset, last_offset_delta, and control batches try: - self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \ - batch.last_offset_delta + self._subscriptions.assignment[tp].last_offset_from_record_batch = batch.base_offset + \ + batch.last_offset_delta # Control batches have a single record indicating whether a transaction # was aborted or committed. # When isolation_level is READ_COMMITTED (currently unsupported) @@ -673,17 +673,18 @@ def _create_fetch_requests(self): """ # create the fetch info as a dict of lists of partition info tuples # which can be passed to FetchRequest() via .items() + version = self._client.api_version(FetchRequest, max_version=6) fetchable = collections.defaultdict(lambda: collections.defaultdict(list)) for partition in self._fetchable_partitions(): node_id = self._client.cluster.leader_for_partition(partition) # advance position for any deleted compacted messages if required - if self._subscriptions.assignment[partition].last_offset_from_message_batch: - next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_message_batch + 1 + if self._subscriptions.assignment[partition].last_offset_from_record_batch: + next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_record_batch + 1 if next_offset_from_batch_header > self._subscriptions.assignment[partition].position: log.debug( - "Advance position for partition %s from %s to %s (last message batch location plus one)" + "Advance position for partition %s from %s to %s (last record batch location plus one)" " to correct for deleted compacted messages and/or transactional control records", partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header) self._subscriptions.assignment[partition].position = next_offset_from_batch_header @@ -697,11 +698,19 @@ def _create_fetch_requests(self): self._client.cluster.request_update() elif self._client.in_flight_request_count(node_id) == 0: - partition_info = ( - partition.partition, - position, - self.config['max_partition_fetch_bytes'] - ) + if version < 5: + partition_info = ( + partition.partition, + position, + self.config['max_partition_fetch_bytes'] + ) + else: + partition_info = ( + partition.partition, + position, + -1, # log_start_offset is used internally by brokers / replicas only + self.config['max_partition_fetch_bytes'], + ) fetchable[node_id][partition.topic].append(partition_info) log.debug("Adding fetch request for partition %s at offset %d", partition, position) @@ -709,40 +718,40 @@ def _create_fetch_requests(self): log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s", partition, node_id) - version = self._client.api_version(FetchRequest, max_version=4) requests = {} for node_id, partition_data in six.iteritems(fetchable): - if version < 3: + # As of version == 3 partitions will be returned in order as + # they are requested, so to avoid starvation with + # `fetch_max_bytes` option we need this shuffle + # NOTE: we do have partition_data in random order due to usage + # of unordered structures like dicts, but that does not + # guarantee equal distribution, and starting in Python3.6 + # dicts retain insert order. + partition_data = list(partition_data.items()) + random.shuffle(partition_data) + + if version <= 2: + requests[node_id] = FetchRequest[version]( + -1, # replica_id + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], + partition_data) + elif version == 3: requests[node_id] = FetchRequest[version]( -1, # replica_id self.config['fetch_max_wait_ms'], self.config['fetch_min_bytes'], - partition_data.items()) + self.config['fetch_max_bytes'], + partition_data) else: - # As of version == 3 partitions will be returned in order as - # they are requested, so to avoid starvation with - # `fetch_max_bytes` option we need this shuffle - # NOTE: we do have partition_data in random order due to usage - # of unordered structures like dicts, but that does not - # guarantee equal distribution, and starting in Python3.6 - # dicts retain insert order. - partition_data = list(partition_data.items()) - random.shuffle(partition_data) - if version == 3: - requests[node_id] = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - self.config['fetch_max_bytes'], - partition_data) - else: - requests[node_id] = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - self.config['fetch_max_bytes'], - self._isolation_level, - partition_data) + # through v6 + requests[node_id] = FetchRequest[version]( + -1, # replica_id + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], + self.config['fetch_max_bytes'], + self._isolation_level, + partition_data) return requests def _handle_fetch_response(self, request, send_time, response): @@ -821,7 +830,7 @@ def _parse_fetched_data(self, completed_fetch): log.debug("Adding fetched record for partition %s with" " offset %d to buffered record list", tp, position) - unpacked = list(self._unpack_message_set(tp, records)) + unpacked = list(self._unpack_records(tp, records)) parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked) if unpacked: last_offset = unpacked[-1].offset @@ -845,7 +854,9 @@ def _parse_fetched_data(self, completed_fetch): self._sensors.record_topic_fetch_metrics(tp.topic, num_bytes, records_count) elif error_type in (Errors.NotLeaderForPartitionError, - Errors.UnknownTopicOrPartitionError): + Errors.UnknownTopicOrPartitionError, + Errors.KafkaStorageError): + log.debug("Error fetching partition %s: %s", tp, error_type.__name__) self._client.cluster.request_update() elif error_type is Errors.OffsetOutOfRangeError: position = self._subscriptions.assignment[tp].position @@ -862,8 +873,10 @@ def _parse_fetched_data(self, completed_fetch): elif error_type is Errors.TopicAuthorizationFailedError: log.warning("Not authorized to read from topic %s.", tp.topic) raise Errors.TopicAuthorizationFailedError(set(tp.topic)) - elif error_type is Errors.UnknownError: - log.warning("Unknown error fetching data for topic-partition %s", tp) + elif error_type.is_retriable: + log.debug("Retriable error fetching partition %s: %s", tp, error_type()) + if error_type.invalid_metadata: + self._client.cluster.request_update() else: raise error_type('Unexpected error while fetching data') diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 08842d133..5ca7c7346 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -381,10 +381,10 @@ def __init__(self): self.reset_strategy = None # the reset strategy if awaitingReset is set self._position = None # offset exposed to the user self.highwater = None - self.drop_pending_message_set = False - # The last message offset hint available from a message batch with + self.drop_pending_record_batch = False + # The last message offset hint available from a record batch with # magic=2 which includes deleted compacted messages - self.last_offset_from_message_batch = None + self.last_offset_from_record_batch = None def _set_position(self, offset): assert self.has_valid_position, 'Valid position required' @@ -399,7 +399,7 @@ def await_reset(self, strategy): self.awaiting_reset = True self.reset_strategy = strategy self._position = None - self.last_offset_from_message_batch = None + self.last_offset_from_record_batch = None self.has_valid_position = False def seek(self, offset): @@ -407,8 +407,8 @@ def seek(self, offset): self.awaiting_reset = False self.reset_strategy = None self.has_valid_position = True - self.drop_pending_message_set = True - self.last_offset_from_message_batch = None + self.drop_pending_record_batch = True + self.last_offset_from_record_batch = None def pause(self): self.paused = True diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index a2aa0e8ec..f13c21b9f 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -156,7 +156,7 @@ class RecordAccumulator(object): will also impact the compression ratio (more batching means better compression). Default: None. linger_ms (int): An artificial delay time to add before declaring a - messageset (that isn't full) ready for sending. This allows + record batch (that isn't full) ready for sending. This allows time for more records to arrive. Setting a non-zero linger_ms will trade off some latency for potentially better throughput due to more batching (and hence fewer, larger requests). diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index f367848ce..1b77e9025 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -14,7 +14,7 @@ class FetchResponse_v0(Response): ('partition', Int32), ('error_code', Int16), ('highwater_offset', Int64), - ('message_set', Bytes))))) + ('records', Bytes))))) ) @@ -29,7 +29,7 @@ class FetchResponse_v1(Response): ('partition', Int32), ('error_code', Int16), ('highwater_offset', Int64), - ('message_set', Bytes))))) + ('records', Bytes))))) ) @@ -46,6 +46,7 @@ class FetchResponse_v3(Response): class FetchResponse_v4(Response): + # Adds message format v2 API_KEY = 1 API_VERSION = 4 SCHEMA = Schema( @@ -60,7 +61,7 @@ class FetchResponse_v4(Response): ('aborted_transactions', Array( ('producer_id', Int64), ('first_offset', Int64))), - ('message_set', Bytes))))) + ('records', Bytes))))) ) @@ -80,7 +81,7 @@ class FetchResponse_v5(Response): ('aborted_transactions', Array( ('producer_id', Int64), ('first_offset', Int64))), - ('message_set', Bytes))))) + ('records', Bytes))))) ) @@ -115,7 +116,7 @@ class FetchResponse_v7(Response): ('aborted_transactions', Array( ('producer_id', Int64), ('first_offset', Int64))), - ('message_set', Bytes))))) + ('records', Bytes))))) ) @@ -156,7 +157,7 @@ class FetchResponse_v11(Response): ('producer_id', Int64), ('first_offset', Int64))), ('preferred_read_replica', Int32), - ('message_set', Bytes))))) + ('records', Bytes))))) ) @@ -211,6 +212,7 @@ class FetchRequest_v3(Request): class FetchRequest_v4(Request): # Adds isolation_level field + # Adds message format v2 API_KEY = 1 API_VERSION = 4 RESPONSE_TYPE = FetchResponse_v4 @@ -264,7 +266,7 @@ class FetchRequest_v6(Request): class FetchRequest_v7(Request): """ - Add incremental fetch requests + Add incremental fetch requests (see KIP-227) """ API_KEY = 1 API_VERSION = 7 diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index 9b3f6bf55..3076a2810 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -47,6 +47,7 @@ class ProduceResponse_v2(Response): class ProduceResponse_v3(Response): + # Adds support for message format v2 API_KEY = 0 API_VERSION = 3 SCHEMA = ProduceResponse_v2.SCHEMA @@ -141,7 +142,7 @@ class ProduceRequest_v0(ProduceRequest): ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('messages', Bytes))))) + ('records', Bytes))))) ) @@ -158,6 +159,7 @@ class ProduceRequest_v2(ProduceRequest): class ProduceRequest_v3(ProduceRequest): + # Adds support for message format v2 API_VERSION = 3 RESPONSE_TYPE = ProduceResponse_v3 SCHEMA = Schema( @@ -168,7 +170,7 @@ class ProduceRequest_v3(ProduceRequest): ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('messages', Bytes))))) + ('records', Bytes))))) ) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index bbc5b0c85..c9b424d54 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -399,7 +399,7 @@ def test__handle_fetch_error(fetcher, caplog, exception, log_level): assert caplog.records[0].levelname == logging.getLevelName(log_level) -def test__unpack_message_set(fetcher): +def test__unpack_records(fetcher): fetcher.config['check_crcs'] = False tp = TopicPartition('foo', 0) messages = [ @@ -408,7 +408,7 @@ def test__unpack_message_set(fetcher): (None, b"c", None), ] memory_records = MemoryRecords(_build_record_batch(messages)) - records = list(fetcher._unpack_message_set(tp, memory_records)) + records = list(fetcher._unpack_records(tp, memory_records)) assert len(records) == 3 assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) assert records[0].value == b'a'