@@ -585,12 +585,11 @@ def _poll_once(self, timeout_ms, max_records):
585
585
dict: Map of topic to list of records (may be empty).
586
586
"""
587
587
if self ._use_consumer_group ():
588
- self ._coordinator .ensure_coordinator_known ()
589
588
self ._coordinator .ensure_active_group ()
590
589
591
590
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
592
591
elif self .config ['group_id' ] is not None and self .config ['api_version' ] >= (0 , 8 , 2 ):
593
- self ._coordinator .ensure_coordinator_known ()
592
+ self ._coordinator .ensure_coordinator_ready ()
594
593
595
594
# Fetch positions if we have partitions we're subscribed to that we
596
595
# don't know the offset for
@@ -835,6 +834,8 @@ def subscription(self):
835
834
Returns:
836
835
set: {topic, ...}
837
836
"""
837
+ if self ._subscription .subscription is None :
838
+ return None
838
839
return self ._subscription .subscription .copy ()
839
840
840
841
def unsubscribe (self ):
@@ -988,26 +989,34 @@ def _update_fetch_positions(self, partitions):
988
989
NoOffsetForPartitionError: If no offset is stored for a given
989
990
partition and no offset reset policy is defined.
990
991
"""
991
- if (self .config ['api_version' ] >= (0 , 8 , 1 ) and
992
- self .config ['group_id' ] is not None ):
992
+ # Lookup any positions for partitions which are awaiting reset (which may be the
993
+ # case if the user called seekToBeginning or seekToEnd. We do this check first to
994
+ # avoid an unnecessary lookup of committed offsets (which typically occurs when
995
+ # the user is manually assigning partitions and managing their own offsets).
996
+ self ._fetcher .reset_offsets_if_needed (partitions )
993
997
994
- # Refresh commits for all assigned partitions
995
- self ._coordinator .refresh_committed_offsets_if_needed ()
998
+ if not self ._subscription .has_all_fetch_positions ():
999
+ # if we still don't have offsets for all partitions, then we should either seek
1000
+ # to the last committed position or reset using the auto reset policy
1001
+ if (self .config ['api_version' ] >= (0 , 8 , 1 ) and
1002
+ self .config ['group_id' ] is not None ):
1003
+ # first refresh commits for all assigned partitions
1004
+ self ._coordinator .refresh_committed_offsets_if_needed ()
996
1005
997
- # Then, do any offset lookups in case some positions are not known
998
- self ._fetcher .update_fetch_positions (partitions )
1006
+ # Then, do any offset lookups in case some positions are not known
1007
+ self ._fetcher .update_fetch_positions (partitions )
999
1008
1000
1009
def _message_generator (self ):
1001
1010
assert self .assignment () or self .subscription () is not None , 'No topic subscription or manual partition assignment'
1002
1011
while time .time () < self ._consumer_timeout :
1003
1012
1004
1013
if self ._use_consumer_group ():
1005
- self ._coordinator .ensure_coordinator_known ()
1014
+ self ._coordinator .ensure_coordinator_ready ()
1006
1015
self ._coordinator .ensure_active_group ()
1007
1016
1008
1017
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
1009
1018
elif self .config ['group_id' ] is not None and self .config ['api_version' ] >= (0 , 8 , 2 ):
1010
- self ._coordinator .ensure_coordinator_known ()
1019
+ self ._coordinator .ensure_coordinator_ready ()
1011
1020
1012
1021
# Fetch offsets for any subscribed partitions that we arent tracking yet
1013
1022
if not self ._subscription .has_all_fetch_positions ():
0 commit comments