Skip to content

Commit 414e286

Browse files
committed
Added beginning_offsets and end_offsets API's and fixed @jeffwidman review issues
1 parent c277ac8 commit 414e286

File tree

4 files changed

+142
-17
lines changed

4 files changed

+142
-17
lines changed

kafka/conn.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -875,7 +875,7 @@ def _next_correlation_id(self):
875875

876876
def _check_api_version_response(self, response):
877877
# The logic here is to check the list of supported request versions
878-
# in descending order. As soon as we find one that works, return it
878+
# in reverse order. As soon as we find one that works, return it
879879
test_cases = [
880880
# format (<broker verion>, <needed struct>)
881881
((0, 10, 1), MetadataRequest[2]),

kafka/consumer/fetcher.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,21 @@ def get_offsets_by_times(self, timestamps, timeout_ms):
192192
offsets[tp] = OffsetAndTimestamp(offset, timestamp)
193193
return offsets
194194

195+
def beginning_offsets(self, partitions, timeout_ms):
196+
return self.beginning_or_end_offset(
197+
partitions, OffsetResetStrategy.EARLIEST, timeout_ms)
198+
199+
def end_offsets(self, partitions, timeout_ms):
200+
return self.beginning_or_end_offset(
201+
partitions, OffsetResetStrategy.LATEST, timeout_ms)
202+
203+
def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
204+
timestamps = dict([(tp, timestamp) for tp in partitions])
205+
offsets = self._retrieve_offsets(timestamps, timeout_ms)
206+
for tp in timestamps:
207+
offsets[tp] = offsets[tp][0]
208+
return offsets
209+
195210
def _reset_offset(self, partition):
196211
"""Reset offsets for the given partition using the offset reset strategy.
197212
@@ -221,10 +236,10 @@ def _reset_offset(self, partition):
221236
self._subscriptions.seek(partition, offset)
222237

223238
def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
224-
""" Fetch offset for each partition passed in ``timestamps`` map.
239+
"""Fetch offset for each partition passed in ``timestamps`` map.
225240
226241
Blocks until offsets are obtained, a non-retriable exception is raised
227-
or ``timeout_ms`` passed (if it's not ``None``).
242+
or ``timeout_ms`` passed.
228243
229244
Arguments:
230245
timestamps: {TopicPartition: int} dict with timestamps to fetch
@@ -267,7 +282,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
267282
remaining_ms = timeout_ms - elapsed_ms
268283

269284
raise Errors.KafkaTimeoutError(
270-
"Failed to get offsets by times in %s ms" % timeout_ms)
285+
"Failed to get offsets by timestamps in %s ms" % timeout_ms)
271286

272287
def _raise_if_offset_out_of_range(self):
273288
"""Check FetchResponses for offset out of range.
@@ -606,7 +621,7 @@ def _deserialize(self, f, topic, bytes_):
606621
return f(bytes_)
607622

608623
def _send_offset_requests(self, timestamps):
609-
""" Fetch offsets for each partition in timestamps dict. This may send
624+
"""Fetch offsets for each partition in timestamps dict. This may send
610625
request to multiple nodes, based on who is Leader for partition.
611626
612627
Arguments:

kafka/consumer/group.py

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -862,33 +862,37 @@ def metrics(self, raw=False):
862862
return metrics
863863

864864
def offsets_for_times(self, timestamps):
865-
"""
866-
Look up the offsets for the given partitions by timestamp. The returned
867-
offset for each partition is the earliest offset whose timestamp is
868-
greater than or equal to the given timestamp in the corresponding
869-
partition.
865+
"""Look up the offsets for the given partitions by timestamp. The
866+
returned offset for each partition is the earliest offset whose
867+
timestamp is greater than or equal to the given timestamp in the
868+
corresponding partition.
870869
871870
This is a blocking call. The consumer does not have to be assigned the
872871
partitions.
873872
874873
If the message format version in a partition is before 0.10.0, i.e.
875874
the messages do not have timestamps, ``None`` will be returned for that
876-
partition.
875+
partition. ``None`` will also be returned for the partition if there
876+
are no messages in it.
877877
878878
Note:
879-
Notice that this method may block indefinitely if the partition
880-
does not exist.
879+
This method may block indefinitely if the partition does not exist.
881880
882881
Arguments:
883882
timestamps (dict): ``{TopicPartition: int}`` mapping from partition
884883
to the timestamp to look up. Unit should be milliseconds since
885884
beginning of the epoch (midnight Jan 1, 1970 (UTC))
886885
886+
Returns:
887+
``{TopicPartition: OffsetAndTimestamp}``: mapping from partition
888+
to the timestamp and offset of the first message with timestamp
889+
greater than or equal to the target timestamp.
890+
887891
Raises:
888-
ValueError: if the target timestamp is negative
889-
UnsupportedVersionError: if the broker does not support looking
892+
ValueError: If the target timestamp is negative
893+
UnsupportedVersionError: If the broker does not support looking
890894
up the offsets by timestamp.
891-
KafkaTimeoutError: if fetch failed in request_timeout_ms
895+
KafkaTimeoutError: If fetch failed in request_timeout_ms
892896
"""
893897
if self.config['api_version'] <= (0, 10, 0):
894898
raise UnsupportedVersionError(
@@ -903,6 +907,67 @@ def offsets_for_times(self, timestamps):
903907
return self._fetcher.get_offsets_by_times(
904908
timestamps, self.config['request_timeout_ms'])
905909

910+
def beginning_offsets(self, partitions):
911+
"""Get the first offset for the given partitions.
912+
913+
This method does not change the current consumer position of the
914+
partitions.
915+
916+
Note:
917+
This method may block indefinitely if the partition does not exist.
918+
919+
Arguments:
920+
partitions (list): List of TopicPartition instances to fetch
921+
offsets for.
922+
923+
Returns:
924+
``{TopicPartition: int}``: The earliest available offsets for the
925+
given partitions.
926+
927+
Raises:
928+
UnsupportedVersionError: If the broker does not support looking
929+
up the offsets by timestamp.
930+
KafkaTimeoutError: If fetch failed in request_timeout_ms.
931+
"""
932+
if self.config['api_version'] <= (0, 10, 0):
933+
raise UnsupportedVersionError(
934+
"offsets_for_times API not supported for cluster version {}"
935+
.format(self.config['api_version']))
936+
offsets = self._fetcher.beginning_offsets(
937+
partitions, self.config['request_timeout_ms'])
938+
return offsets
939+
940+
def end_offsets(self, partitions):
941+
"""Get the last offset for the given partitions. The last offset of a
942+
partition is the offset of the upcoming message, i.e. the offset of the
943+
last available message + 1.
944+
945+
This method does not change the current consumer position of the
946+
partitions.
947+
948+
Note:
949+
This method may block indefinitely if the partition does not exist.
950+
951+
Arguments:
952+
partitions (list): List of TopicPartition instances to fetch
953+
offsets for.
954+
955+
Returns:
956+
``{TopicPartition: int}``: The end offsets for the given partitions.
957+
958+
Raises:
959+
UnsupportedVersionError: If the broker does not support looking
960+
up the offsets by timestamp.
961+
KafkaTimeoutError: If fetch failed in request_timeout_ms
962+
"""
963+
if self.config['api_version'] <= (0, 10, 0):
964+
raise UnsupportedVersionError(
965+
"offsets_for_times API not supported for cluster version {}"
966+
.format(self.config['api_version']))
967+
offsets = self._fetcher.end_offsets(
968+
partitions, self.config['request_timeout_ms'])
969+
return offsets
970+
906971
def _use_consumer_group(self):
907972
"""Return True iff this consumer can/should join a broker-coordinated group."""
908973
if self.config['api_version'] < (0, 9):

test/test_consumer_integration.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
)
1313
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
1414
from kafka.errors import (
15-
ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError
15+
ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
16+
KafkaTimeoutError
1617
)
1718
from kafka.structs import (
1819
ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
@@ -666,6 +667,9 @@ def test_kafka_consumer_offsets_for_time(self):
666667
self.assertEqual(offsets[tp].offset, late_msg.offset)
667668
self.assertEqual(offsets[tp].timestamp, late_time)
668669

670+
offsets = consumer.offsets_for_times({})
671+
self.assertEqual(offsets, {})
672+
669673
# Out of bound timestamps check
670674

671675
offsets = consumer.offsets_for_times({tp: 0})
@@ -675,6 +679,17 @@ def test_kafka_consumer_offsets_for_time(self):
675679
offsets = consumer.offsets_for_times({tp: 9999999999999})
676680
self.assertEqual(offsets[tp], None)
677681

682+
# Beginning/End offsets
683+
684+
offsets = consumer.beginning_offsets([tp])
685+
self.assertEqual(offsets, {
686+
tp: early_msg.offset,
687+
})
688+
offsets = consumer.end_offsets([tp])
689+
self.assertEqual(offsets, {
690+
tp: late_msg.offset + 1
691+
})
692+
678693
@kafka_versions('>=0.10.1')
679694
def test_kafka_consumer_offsets_search_many_partitions(self):
680695
tp0 = TopicPartition(self.topic, 0)
@@ -700,10 +715,40 @@ def test_kafka_consumer_offsets_search_many_partitions(self):
700715
tp1: OffsetAndTimestamp(p1msg.offset, send_time)
701716
})
702717

718+
offsets = consumer.beginning_offsets([tp0, tp1])
719+
self.assertEqual(offsets, {
720+
tp0: p0msg.offset,
721+
tp1: p1msg.offset
722+
})
723+
724+
offsets = consumer.end_offsets([tp0, tp1])
725+
self.assertEqual(offsets, {
726+
tp0: p0msg.offset + 1,
727+
tp1: p1msg.offset + 1
728+
})
729+
703730
@kafka_versions('<0.10.1')
704731
def test_kafka_consumer_offsets_for_time_old(self):
705732
consumer = self.kafka_consumer()
706733
tp = TopicPartition(self.topic, 0)
707734

708735
with self.assertRaises(UnsupportedVersionError):
709736
consumer.offsets_for_times({tp: int(time.time())})
737+
738+
with self.assertRaises(UnsupportedVersionError):
739+
consumer.beginning_offsets([tp])
740+
741+
with self.assertRaises(UnsupportedVersionError):
742+
consumer.end_offsets([tp])
743+
744+
@kafka_versions('<0.10.1')
745+
def test_kafka_consumer_offsets_for_times_errors(self):
746+
consumer = self.kafka_consumer()
747+
tp = TopicPartition(self.topic, 0)
748+
bad_tp = TopicPartition(self.topic, 100)
749+
750+
with self.assertRaises(ValueError):
751+
consumer.offsets_for_times({tp: -1})
752+
753+
with self.assertRaises(KafkaTimeoutError):
754+
consumer.offsets_for_times({bad_tp: 0})

0 commit comments

Comments
 (0)