Skip to content

Commit ac8e42b

Browse files
committed
OffsetAndTimestamp; FetchRequest v10
1 parent 2edb8fd commit ac8e42b

File tree

7 files changed

+108
-65
lines changed

7 files changed

+108
-65
lines changed

kafka/consumer/fetcher.py

Lines changed: 60 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
)
1919
from kafka.record import MemoryRecords
2020
from kafka.serializer import Deserializer
21-
from kafka.structs import TopicPartition, OffsetAndTimestamp
21+
from kafka.structs import TopicPartition, OffsetAndMetadata, OffsetAndTimestamp
2222

2323
log = logging.getLogger(__name__)
2424

@@ -28,7 +28,7 @@
2828
READ_COMMITTED = 1
2929

3030
ConsumerRecord = collections.namedtuple("ConsumerRecord",
31-
["topic", "partition", "offset", "timestamp", "timestamp_type",
31+
["topic", "partition", "leader_epoch", "offset", "timestamp", "timestamp_type",
3232
"key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"])
3333

3434

@@ -198,9 +198,6 @@ def get_offsets_by_times(self, timestamps, timeout_ms):
198198
for tp in timestamps:
199199
if tp not in offsets:
200200
offsets[tp] = None
201-
else:
202-
offset, timestamp = offsets[tp]
203-
offsets[tp] = OffsetAndTimestamp(offset, timestamp)
204201
return offsets
205202

206203
def beginning_offsets(self, partitions, timeout_ms):
@@ -215,7 +212,7 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
215212
timestamps = dict([(tp, timestamp) for tp in partitions])
216213
offsets = self._retrieve_offsets(timestamps, timeout_ms)
217214
for tp in timestamps:
218-
offsets[tp] = offsets[tp][0]
215+
offsets[tp] = offsets[tp].offset
219216
return offsets
220217

221218
def _reset_offset(self, partition):
@@ -240,7 +237,7 @@ def _reset_offset(self, partition):
240237
offsets = self._retrieve_offsets({partition: timestamp})
241238

242239
if partition in offsets:
243-
offset = offsets[partition][0]
240+
offset = offsets[partition].offset
244241

245242
# we might lose the assignment while fetching the offset,
246243
# so check it is still active
@@ -261,8 +258,8 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
261258
available. Otherwise timestamp is treated as epoch milliseconds.
262259
263260
Returns:
264-
{TopicPartition: (int, int)}: Mapping of partition to
265-
retrieved offset and timestamp. If offset does not exist for
261+
{TopicPartition: OffsetAndTimestamp}: Mapping of partition to
262+
retrieved offset, timestamp, and leader_epoch. If offset does not exist for
266263
the provided timestamp, that partition will be missing from
267264
this mapping.
268265
"""
@@ -373,28 +370,29 @@ def _append(self, drained, part, max_records, update_offsets):
373370
log.debug("Not returning fetched records for assigned partition"
374371
" %s since it is no longer fetchable", tp)
375372

376-
elif fetch_offset == position:
373+
elif fetch_offset == position.offset:
377374
# we are ensured to have at least one record since we already checked for emptiness
378375
part_records = part.take(max_records)
379376
next_offset = part_records[-1].offset + 1
377+
leader_epoch = part_records[-1].leader_epoch
380378

381379
log.log(0, "Returning fetched records at offset %d for assigned"
382-
" partition %s and update position to %s", position,
383-
tp, next_offset)
380+
" partition %s and update position to %s (leader epoch %s)", position.offset,
381+
tp, next_offset, leader_epoch)
384382

385383
for record in part_records:
386384
drained[tp].append(record)
387385

388386
if update_offsets:
389-
self._subscriptions.assignment[tp].position = next_offset
387+
self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, b'', leader_epoch)
390388
return len(part_records)
391389

392390
else:
393391
# these records aren't next in line based on the last consumed
394392
# position, ignore them they must be from an obsolete request
395393
log.debug("Ignoring fetched records for %s at offset %s since"
396394
" the current position is %d", tp, part.fetch_offset,
397-
position)
395+
position.offset)
398396

399397
part.discard()
400398
return 0
@@ -444,13 +442,13 @@ def _message_generator(self):
444442
break
445443

446444
# Compressed messagesets may include earlier messages
447-
elif msg.offset < self._subscriptions.assignment[tp].position:
445+
elif msg.offset < self._subscriptions.assignment[tp].position.offset:
448446
log.debug("Skipping message offset: %s (expecting %s)",
449447
msg.offset,
450-
self._subscriptions.assignment[tp].position)
448+
self._subscriptions.assignment[tp].position.offset)
451449
continue
452450

453-
self._subscriptions.assignment[tp].position = msg.offset + 1
451+
self._subscriptions.assignment[tp].position = OffsetAndMetadata(msg.offset + 1, b'', -1)
454452
yield msg
455453

456454
self._next_partition_records = None
@@ -463,8 +461,9 @@ def _unpack_records(self, tp, records):
463461
# Try DefaultsRecordBatch / message log format v2
464462
# base_offset, last_offset_delta, and control batches
465463
try:
466-
self._subscriptions.assignment[tp].last_offset_from_record_batch = batch.base_offset + \
467-
batch.last_offset_delta
464+
batch_offset = batch.base_offset + batch.last_offset_delta
465+
leader_epoch = batch.leader_epoch
466+
self._subscriptions.assignment[tp].last_offset_from_record_batch = batch_offset
468467
# Control batches have a single record indicating whether a transaction
469468
# was aborted or committed.
470469
# When isolation_level is READ_COMMITTED (currently unsupported)
@@ -475,6 +474,7 @@ def _unpack_records(self, tp, records):
475474
batch = records.next_batch()
476475
continue
477476
except AttributeError:
477+
leader_epoch = -1
478478
pass
479479

480480
for record in batch:
@@ -491,7 +491,7 @@ def _unpack_records(self, tp, records):
491491
len(h_key.encode("utf-8")) + (len(h_val) if h_val is not None else 0) for h_key, h_val in
492492
headers) if headers else -1
493493
yield ConsumerRecord(
494-
tp.topic, tp.partition, record.offset, record.timestamp,
494+
tp.topic, tp.partition, leader_epoch, record.offset, record.timestamp,
495495
record.timestamp_type, key, value, headers, record.checksum,
496496
key_size, value_size, header_size)
497497

@@ -577,7 +577,9 @@ def _send_list_offsets_request(self, node_id, timestamps):
577577
version = self._client.api_version(ListOffsetsRequest, max_version=3)
578578
by_topic = collections.defaultdict(list)
579579
for tp, timestamp in six.iteritems(timestamps):
580-
if version >= 1:
580+
if version >= 4:
581+
data = (tp.partition, leader_epoch, timestamp)
582+
elif version >= 1:
581583
data = (tp.partition, timestamp)
582584
else:
583585
data = (tp.partition, timestamp, 1)
@@ -628,17 +630,18 @@ def _handle_list_offsets_response(self, future, response):
628630
offset = UNKNOWN_OFFSET
629631
else:
630632
offset = offsets[0]
631-
log.debug("Handling v0 ListOffsetsResponse response for %s. "
632-
"Fetched offset %s", partition, offset)
633-
if offset != UNKNOWN_OFFSET:
634-
timestamp_offset_map[partition] = (offset, None)
635-
else:
633+
timestamp = None
634+
leader_epoch = -1
635+
elif response.API_VERSION <= 3:
636636
timestamp, offset = partition_info[2:]
637-
log.debug("Handling ListOffsetsResponse response for %s. "
638-
"Fetched offset %s, timestamp %s",
639-
partition, offset, timestamp)
640-
if offset != UNKNOWN_OFFSET:
641-
timestamp_offset_map[partition] = (offset, timestamp)
637+
leader_epoch = -1
638+
else:
639+
timestamp, offset, leader_epoch = partition_info[2:]
640+
log.debug("Handling ListOffsetsResponse response for %s. "
641+
"Fetched offset %s, timestamp %s, leader_epoch %s",
642+
partition, offset, timestamp, leader_epoch)
643+
if offset != UNKNOWN_OFFSET:
644+
timestamp_offset_map[partition] = OffsetAndTimestamp(offset, timestamp, leader_epoch)
642645
elif error_type is Errors.UnsupportedForMessageFormatError:
643646
# The message format on the broker side is before 0.10.0,
644647
# we simply put None in the response.
@@ -686,7 +689,7 @@ def _create_fetch_requests(self):
686689
"""
687690
# create the fetch info as a dict of lists of partition info tuples
688691
# which can be passed to FetchRequest() via .items()
689-
version = self._client.api_version(FetchRequest, max_version=8)
692+
version = self._client.api_version(FetchRequest, max_version=10)
690693
fetchable = collections.defaultdict(dict)
691694

692695
for partition in self._fetchable_partitions():
@@ -695,12 +698,12 @@ def _create_fetch_requests(self):
695698
# advance position for any deleted compacted messages if required
696699
if self._subscriptions.assignment[partition].last_offset_from_record_batch:
697700
next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_record_batch + 1
698-
if next_offset_from_batch_header > self._subscriptions.assignment[partition].position:
701+
if next_offset_from_batch_header > self._subscriptions.assignment[partition].position.offset:
699702
log.debug(
700703
"Advance position for partition %s from %s to %s (last record batch location plus one)"
701704
" to correct for deleted compacted messages and/or transactional control records",
702-
partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header)
703-
self._subscriptions.assignment[partition].position = next_offset_from_batch_header
705+
partition, self._subscriptions.assignment[partition].position.offset, next_offset_from_batch_header)
706+
self._subscriptions.assignment[partition].position = OffsetAndMetadata(next_offset_from_batch_header, b'', -1)
704707

705708
position = self._subscriptions.assignment[partition].position
706709

@@ -718,19 +721,28 @@ def _create_fetch_requests(self):
718721
if version < 5:
719722
partition_info = (
720723
partition.partition,
721-
position,
724+
position.offset,
722725
self.config['max_partition_fetch_bytes']
723726
)
727+
elif version <= 8:
728+
partition_info = (
729+
partition.partition,
730+
position.offset,
731+
-1, # log_start_offset is used internally by brokers / replicas only
732+
self.config['max_partition_fetch_bytes'],
733+
)
724734
else:
725735
partition_info = (
726736
partition.partition,
727-
position,
737+
position.leader_epoch,
738+
position.offset,
728739
-1, # log_start_offset is used internally by brokers / replicas only
729740
self.config['max_partition_fetch_bytes'],
730741
)
742+
731743
fetchable[node_id][partition] = partition_info
732744
log.debug("Adding fetch request for partition %s at offset %d",
733-
partition, position)
745+
partition, position.offset)
734746

735747
requests = {}
736748
for node_id, next_partitions in six.iteritems(fetchable):
@@ -778,7 +790,10 @@ def _create_fetch_requests(self):
778790

779791
fetch_offsets = {}
780792
for tp, partition_data in six.iteritems(next_partitions):
781-
offset = partition_data[1]
793+
if version <= 8:
794+
offset = partition_data[1]
795+
else:
796+
offset = partition_data[2]
782797
fetch_offsets[tp] = offset
783798

784799
requests[node_id] = (request, fetch_offsets)
@@ -807,7 +822,7 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response):
807822
tp = TopicPartition(topic, partition_data[0])
808823
fetch_offset = fetch_offsets[tp]
809824
completed_fetch = CompletedFetch(
810-
tp, fetch_offsets[tp],
825+
tp, fetch_offset,
811826
response.API_VERSION,
812827
partition_data[1:],
813828
metric_aggregator
@@ -847,18 +862,18 @@ def _parse_fetched_data(self, completed_fetch):
847862
# Note that the *response* may return a messageset that starts
848863
# earlier (e.g., compressed messages) or later (e.g., compacted topic)
849864
position = self._subscriptions.assignment[tp].position
850-
if position is None or position != fetch_offset:
865+
if position is None or position.offset != fetch_offset:
851866
log.debug("Discarding fetch response for partition %s"
852867
" since its offset %d does not match the"
853868
" expected offset %d", tp, fetch_offset,
854-
position)
869+
position.offset)
855870
return None
856871

857872
records = MemoryRecords(completed_fetch.partition_data[-1])
858873
if records.has_next():
859874
log.debug("Adding fetched record for partition %s with"
860875
" offset %d to buffered record list", tp,
861-
position)
876+
position.offset)
862877
unpacked = list(self._unpack_records(tp, records))
863878
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
864879
if unpacked:
@@ -889,10 +904,10 @@ def _parse_fetched_data(self, completed_fetch):
889904
self._client.cluster.request_update()
890905
elif error_type is Errors.OffsetOutOfRangeError:
891906
position = self._subscriptions.assignment[tp].position
892-
if position is None or position != fetch_offset:
907+
if position is None or position.offset != fetch_offset:
893908
log.debug("Discarding stale fetch response for partition %s"
894909
" since the fetched offset %d does not match the"
895-
" current offset %d", tp, fetch_offset, position)
910+
" current offset %d", tp, fetch_offset, position.offset)
896911
elif self._subscriptions.has_default_offset_reset_policy():
897912
log.info("Fetch offset %s is out of range for topic-partition %s", fetch_offset, tp)
898913
self._subscriptions.need_offset_reset(tp)

kafka/consumer/group.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
1818
from kafka.metrics import MetricConfig, Metrics
1919
from kafka.protocol.list_offsets import OffsetResetStrategy
20-
from kafka.structs import TopicPartition
20+
from kafka.structs import OffsetAndMetadata, TopicPartition
2121
from kafka.version import __version__
2222

2323
log = logging.getLogger(__name__)
@@ -737,11 +737,11 @@ def position(self, partition):
737737
if not isinstance(partition, TopicPartition):
738738
raise TypeError('partition must be a TopicPartition namedtuple')
739739
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
740-
offset = self._subscription.assignment[partition].position
740+
position = self._subscription.assignment[partition].position
741741
if offset is None:
742742
self._update_fetch_positions([partition])
743-
offset = self._subscription.assignment[partition].position
744-
return offset
743+
position = self._subscription.assignment[partition].position
744+
return position.offset
745745

746746
def highwater(self, partition):
747747
"""Last known highwater offset for a partition.
@@ -1144,7 +1144,7 @@ def _message_generator_v2(self):
11441144
log.debug("Not returning fetched records for partition %s"
11451145
" since it is no longer fetchable", tp)
11461146
break
1147-
self._subscription.assignment[tp].position = record.offset + 1
1147+
self._subscription.assignment[tp].position = OffsetAndMetadata(record.offset + 1, b'', -1)
11481148
yield record
11491149

11501150
def _message_generator(self):

kafka/consumer/subscription_state.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ def all_consumed_offsets(self):
319319
all_consumed = {}
320320
for partition, state in six.iteritems(self.assignment):
321321
if state.has_valid_position:
322-
all_consumed[partition] = OffsetAndMetadata(state.position, b'', -1)
322+
all_consumed[partition] = state.position
323323
return all_consumed
324324

325325
def need_offset_reset(self, partition, offset_reset_strategy=None):
@@ -379,7 +379,7 @@ def __init__(self):
379379
self.paused = False # whether this partition has been paused by the user
380380
self.awaiting_reset = False # whether we are awaiting reset
381381
self.reset_strategy = None # the reset strategy if awaitingReset is set
382-
self._position = None # offset exposed to the user
382+
self._position = None # OffsetAndMetadata exposed to the user
383383
self.highwater = None
384384
self.drop_pending_record_batch = False
385385
# The last message offset hint available from a record batch with
@@ -388,6 +388,7 @@ def __init__(self):
388388

389389
def _set_position(self, offset):
390390
assert self.has_valid_position, 'Valid position required'
391+
assert isinstance(offset, OffsetAndMetadata)
391392
self._position = offset
392393

393394
def _get_position(self):
@@ -403,7 +404,7 @@ def await_reset(self, strategy):
403404
self.has_valid_position = False
404405

405406
def seek(self, offset):
406-
self._position = offset
407+
self._position = OffsetAndMetadata(offset, b'', -1)
407408
self.awaiting_reset = False
408409
self.reset_strategy = None
409410
self.has_valid_position = True

kafka/record/default_records.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ def __init__(self, buffer):
136136
def base_offset(self):
137137
return self._header_data[0]
138138

139+
@property
140+
def leader_epoch(self):
141+
return self._header_data[2]
142+
139143
@property
140144
def magic(self):
141145
return self._header_data[3]

kafka/structs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@
6666
Keyword Arguments:
6767
offset (int): An offset
6868
timestamp (int): The timestamp associated to the offset
69+
leader_epoch (int): The last known epoch from the leader / broker
6970
"""
7071
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
71-
["offset", "timestamp"])
72+
["offset", "timestamp", "leader_epoch"])
7273

7374
MemberInformation = namedtuple("MemberInformation",
7475
["member_id", "client_id", "client_host", "member_metadata", "member_assignment"])

test/test_consumer_integration.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,8 @@ def test_kafka_consumer_offsets_search_many_partitions(kafka_consumer, kafka_pro
259259
})
260260

261261
assert offsets == {
262-
tp0: OffsetAndTimestamp(p0msg.offset, send_time),
263-
tp1: OffsetAndTimestamp(p1msg.offset, send_time)
262+
tp0: OffsetAndTimestamp(p0msg.offset, send_time, -1),
263+
tp1: OffsetAndTimestamp(p1msg.offset, send_time, -1)
264264
}
265265

266266
offsets = consumer.beginning_offsets([tp0, tp1])

0 commit comments

Comments
 (0)