@@ -349,7 +349,7 @@ def _send_request_to_controller(self, request):
349
349
# one of these attributes and that they always unpack into
350
350
# (topic, error_code) tuples.
351
351
topic_error_tuples = (response .topic_errors if hasattr (response , 'topic_errors' )
352
- else response .topic_error_codes )
352
+ else response .topic_error_codes )
353
353
# Also small py2/py3 compatibility -- py3 can ignore extra values
354
354
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
355
355
# So for now we have to map across the list and explicitly drop any
@@ -501,8 +501,8 @@ def describe_configs(self, config_resources, include_synonyms=False):
501
501
future = self ._send_request_to_node (self ._client .least_loaded_node (), request )
502
502
503
503
self ._wait_for_futures ([future ])
504
-
505
- return future . value
504
+ response = future . value
505
+ return response
506
506
507
507
@staticmethod
508
508
def _convert_alter_config_resource_request (config_resource ):
@@ -544,8 +544,8 @@ def alter_configs(self, config_resources):
544
544
future = self ._send_request_to_node (self ._client .least_loaded_node (), request )
545
545
546
546
self ._wait_for_futures ([future ])
547
-
548
- return future . value
547
+ response = future . value
548
+ return response
549
549
550
550
# alter replica logs dir protocol not yet implemented
551
551
# Note: have to lookup the broker with the replica assignment and send the request to that broker
@@ -602,6 +602,54 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
602
602
# describe delegation_token protocol not yet implemented
603
603
# Note: send the request to the least_loaded_node()
604
604
605
+ def _describe_consumer_groups_send_request (self , group_id , group_coordinator_id ):
606
+ """Send a DescribeGroupsRequest to the group's coordinator.
607
+
608
+ :param group_id: The group name as a string
609
+ :param group_coordinator_id: The node_id of the groups' coordinator
610
+ broker.
611
+ :return: A message future.
612
+ """
613
+ version = self ._matching_api_version (DescribeGroupsRequest )
614
+ if version <= 1 :
615
+ # Note: KAFKA-6788 A potential optimization is to group the
616
+ # request per coordinator and send one request with a list of
617
+ # all consumer groups. Java still hasn't implemented this
618
+ # because the error checking is hard to get right when some
619
+ # groups error and others don't.
620
+ request = DescribeGroupsRequest [version ](groups = (group_id ,))
621
+ else :
622
+ raise NotImplementedError (
623
+ "Support for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient."
624
+ .format (version ))
625
+ return self ._send_request_to_node (group_coordinator_id , request )
626
+
627
+ def _describe_consumer_groups_process_response (self , response ):
628
+ """Process a DescribeGroupsResponse into a group description."""
629
+ if response .API_VERSION <= 1 :
630
+ assert len (response .groups ) == 1
631
+ # TODO need to implement converting the response tuple into
632
+ # a more accessible interface like a namedtuple and then stop
633
+ # hardcoding tuple indices here. Several Java examples,
634
+ # including KafkaAdminClient.java
635
+ group_description = response .groups [0 ]
636
+ error_code = group_description [0 ]
637
+ error_type = Errors .for_code (error_code )
638
+ # Java has the note: KAFKA-6789, we can retry based on the error code
639
+ if error_type is not Errors .NoError :
640
+ raise error_type (
641
+ "DescribeGroupsResponse failed with response '{}'."
642
+ .format (response ))
643
+ # TODO Java checks the group protocol type, and if consumer
644
+ # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
645
+ # the members' partition assignments... that hasn't yet been
646
+ # implemented here so just return the raw struct results
647
+ else :
648
+ raise NotImplementedError (
649
+ "Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."
650
+ .format (response .API_VERSION ))
651
+ return group_description
652
+
605
653
def describe_consumer_groups (self , group_ids , group_coordinator_id = None ):
606
654
"""Describe a set of consumer groups.
607
655
@@ -622,51 +670,52 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
622
670
"""
623
671
group_descriptions = []
624
672
futures = []
625
- version = self ._matching_api_version (DescribeGroupsRequest )
626
673
for group_id in group_ids :
627
674
if group_coordinator_id is not None :
628
675
this_groups_coordinator_id = group_coordinator_id
629
676
else :
630
677
this_groups_coordinator_id = self ._find_group_coordinator_id (group_id )
631
-
632
- if version <= 1 :
633
- # Note: KAFKA-6788 A potential optimization is to group the
634
- # request per coordinator and send one request with a list of
635
- # all consumer groups. Java still hasn't implemented this
636
- # because the error checking is hard to get right when some
637
- # groups error and others don't.
638
- request = DescribeGroupsRequest [version ](groups = (group_id ,))
639
- futures .append (self ._send_request_to_node (this_groups_coordinator_id , request ))
640
- else :
641
- raise NotImplementedError (
642
- "Support for DescribeGroups v{} has not yet been added to KafkaAdminClient."
643
- .format (version ))
678
+ f = self ._describe_consumer_groups_send_request (group_id , this_groups_coordinator_id )
679
+ futures .append (f )
644
680
645
681
self ._wait_for_futures (futures )
646
682
647
683
for future in futures :
648
684
response = future .value
649
- assert len (response .groups ) == 1
650
- # TODO need to implement converting the response tuple into
651
- # a more accessible interface like a namedtuple and then stop
652
- # hardcoding tuple indices here. Several Java examples,
653
- # including KafkaAdminClient.java
654
- group_description = response .groups [0 ]
655
- error_code = group_description [0 ]
656
- error_type = Errors .for_code (error_code )
657
- # Java has the note: KAFKA-6789, we can retry based on the error code
658
- if error_type is not Errors .NoError :
659
- raise error_type (
660
- "Request '{}' failed with response '{}'."
661
- .format (request , response ))
662
- # TODO Java checks the group protocol type, and if consumer
663
- # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
664
- # the members' partition assignments... that hasn't yet been
665
- # implemented here so just return the raw struct results
685
+ group_description = self ._describe_consumer_groups_process_response (response )
666
686
group_descriptions .append (group_description )
667
687
668
688
return group_descriptions
669
689
690
+ def _list_consumer_groups_send_request (self , broker_id ):
691
+ """Send a ListGroupsRequest to a broker.
692
+
693
+ :param broker_id: The broker's node_id.
694
+ :return: A message future
695
+ """
696
+ version = self ._matching_api_version (ListGroupsRequest )
697
+ if version <= 2 :
698
+ request = ListGroupsRequest [version ]()
699
+ else :
700
+ raise NotImplementedError (
701
+ "Support for ListGroupsRequest_v{} has not yet been added to KafkaAdminClient."
702
+ .format (version ))
703
+ return self ._send_request_to_node (broker_id , request )
704
+
705
+ def _list_consumer_groups_process_response (self , response ):
706
+ """Process a ListGroupsResponse into a list of groups."""
707
+ if response .API_VERSION <= 2 :
708
+ error_type = Errors .for_code (response .error_code )
709
+ if error_type is not Errors .NoError :
710
+ raise error_type (
711
+ "ListGroupsRequest failed with response '{}'."
712
+ .format (response ))
713
+ else :
714
+ raise NotImplementedError (
715
+ "Support for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient."
716
+ .format (response .API_VERSION ))
717
+ return response .groups
718
+
670
719
def list_consumer_groups (self , broker_ids = None ):
671
720
"""List all consumer groups known to the cluster.
672
721
@@ -692,65 +741,26 @@ def list_consumer_groups(self, broker_ids=None):
692
741
:exception GroupLoadInProgressError: The coordinator is loading and
693
742
hence can't process requests.
694
743
"""
744
+ if broker_ids is None :
745
+ broker_ids = [broker .nodeId for broker in self ._client .cluster .brokers ()]
746
+ futures = [self ._list_consumer_groups_send_request (b ) for b in broker_ids ]
747
+ self ._wait_for_futures (futures )
695
748
# While we return a list, internally use a set to prevent duplicates
696
749
# because if a group coordinator fails after being queried, and its
697
750
# consumer groups move to new brokers that haven't yet been queried,
698
751
# then the same group could be returned by multiple brokers.
699
- consumer_groups = set ()
700
- futures = []
701
- if broker_ids is None :
702
- broker_ids = [broker .nodeId for broker in self ._client .cluster .brokers ()]
703
- version = self ._matching_api_version (ListGroupsRequest )
704
- if version <= 2 :
705
- request = ListGroupsRequest [version ]()
706
- for broker_id in broker_ids :
707
- futures .append (self ._send_request_to_node (broker_id , request ))
708
-
709
- self ._wait_for_futures (futures )
710
-
711
- for future in futures :
712
- response = future .value
713
- error_type = Errors .for_code (response .error_code )
714
- if error_type is not Errors .NoError :
715
- raise error_type (
716
- "Request '{}' failed with response '{}'."
717
- .format (request , response ))
718
- consumer_groups .update (response .groups )
719
- else :
720
- raise NotImplementedError (
721
- "Support for ListGroups v{} has not yet been added to KafkaAdminClient."
722
- .format (version ))
752
+ consumer_groups = {self ._list_consumer_groups_process_response (f .value ) for f in futures }
723
753
return list (consumer_groups )
724
754
725
- def list_consumer_group_offsets (self , group_id , group_coordinator_id = None ,
726
- partitions = None ):
727
- """Fetch Consumer Group Offsets.
728
-
729
- Note:
730
- This does not verify that the group_id or partitions actually exist
731
- in the cluster.
732
-
733
- As soon as any error is encountered, it is immediately raised.
755
+ def _list_consumer_group_offsets_send_request (self , group_id ,
756
+ group_coordinator_id , partitions = None ):
757
+ """Send an OffsetFetchRequest to a broker.
734
758
735
759
:param group_id: The consumer group id name for which to fetch offsets.
736
760
:param group_coordinator_id: The node_id of the group's coordinator
737
- broker. If set to None, will query the cluster to find the group
738
- coordinator. Explicitly specifying this can be useful to prevent
739
- that extra network round trip if you already know the group
740
- coordinator. Default: None.
741
- :param partitions: A list of TopicPartitions for which to fetch
742
- offsets. On brokers >= 0.10.2, this can be set to None to fetch all
743
- known offsets for the consumer group. Default: None.
744
- :return dictionary: A dictionary with TopicPartition keys and
745
- OffsetAndMetada values. Partitions that are not specified and for
746
- which the group_id does not have a recorded offset are omitted. An
747
- offset value of `-1` indicates the group_id has no offset for that
748
- TopicPartition. A `-1` can only happen for partitions that are
749
- explicitly specified.
761
+ broker.
762
+ :return: A message future
750
763
"""
751
- group_offsets_listing = {}
752
- if group_coordinator_id is None :
753
- group_coordinator_id = self ._find_group_coordinator_id (group_id )
754
764
version = self ._matching_api_version (OffsetFetchRequest )
755
765
if version <= 3 :
756
766
if partitions is None :
@@ -768,32 +778,80 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
768
778
topics_partitions_dict [topic ].add (partition )
769
779
topics_partitions = list (six .iteritems (topics_partitions_dict ))
770
780
request = OffsetFetchRequest [version ](group_id , topics_partitions )
771
- future = self ._send_request_to_node (group_coordinator_id , request )
772
- self ._wait_for_futures ([future ])
773
- response = future .value
781
+ else :
782
+ raise NotImplementedError (
783
+ "Support for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient."
784
+ .format (version ))
785
+ return self ._send_request_to_node (group_coordinator_id , request )
774
786
775
- if version > 1 : # OffsetFetchResponse_v1 lacks a top-level error_code
787
+ def _list_consumer_group_offsets_process_response (self , response ):
788
+ """Process an OffsetFetchResponse.
789
+
790
+ :param response: an OffsetFetchResponse.
791
+ :return: A dictionary composed of TopicPartition keys and
792
+ OffsetAndMetada values.
793
+ """
794
+ if response .API_VERSION <= 3 :
795
+
796
+ # OffsetFetchResponse_v1 lacks a top-level error_code
797
+ if response .API_VERSION > 1 :
776
798
error_type = Errors .for_code (response .error_code )
777
799
if error_type is not Errors .NoError :
778
800
# optionally we could retry if error_type.retriable
779
801
raise error_type (
780
- "Request '{}' failed with response '{}'."
781
- .format (request , response ))
802
+ "OffsetFetchResponse failed with response '{}'."
803
+ .format (response ))
804
+
782
805
# transform response into a dictionary with TopicPartition keys and
783
806
# OffsetAndMetada values--this is what the Java AdminClient returns
807
+ offsets = {}
784
808
for topic , partitions in response .topics :
785
809
for partition , offset , metadata , error_code in partitions :
786
810
error_type = Errors .for_code (error_code )
787
811
if error_type is not Errors .NoError :
788
812
raise error_type (
789
- "Unable to fetch offsets for group_id {}, topic {}, partition {}"
790
- .format (group_id , topic , partition ))
791
- group_offsets_listing [TopicPartition (topic , partition )] = OffsetAndMetadata (offset , metadata )
813
+ "Unable to fetch consumer group offsets for topic {}, partition {}"
814
+ .format (topic , partition ))
815
+ offsets [TopicPartition (topic , partition )] = OffsetAndMetadata (offset , metadata )
792
816
else :
793
817
raise NotImplementedError (
794
- "Support for OffsetFetch v{} has not yet been added to KafkaAdminClient."
795
- .format (version ))
796
- return group_offsets_listing
818
+ "Support for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient."
819
+ .format (response .API_VERSION ))
820
+ return offsets
821
+
822
+ def list_consumer_group_offsets (self , group_id , group_coordinator_id = None ,
823
+ partitions = None ):
824
+ """Fetch Consumer Offsets for a single consumer group.
825
+
826
+ Note:
827
+ This does not verify that the group_id or partitions actually exist
828
+ in the cluster.
829
+
830
+ As soon as any error is encountered, it is immediately raised.
831
+
832
+ :param group_id: The consumer group id name for which to fetch offsets.
833
+ :param group_coordinator_id: The node_id of the group's coordinator
834
+ broker. If set to None, will query the cluster to find the group
835
+ coordinator. Explicitly specifying this can be useful to prevent
836
+ that extra network round trip if you already know the group
837
+ coordinator. Default: None.
838
+ :param partitions: A list of TopicPartitions for which to fetch
839
+ offsets. On brokers >= 0.10.2, this can be set to None to fetch all
840
+ known offsets for the consumer group. Default: None.
841
+ :return dictionary: A dictionary with TopicPartition keys and
842
+ OffsetAndMetada values. Partitions that are not specified and for
843
+ which the group_id does not have a recorded offset are omitted. An
844
+ offset value of `-1` indicates the group_id has no offset for that
845
+ TopicPartition. A `-1` can only happen for partitions that are
846
+ explicitly specified.
847
+ """
848
+ if group_coordinator_id is None :
849
+ group_coordinator_id = self ._find_group_coordinator_id (group_id )
850
+ future = self ._list_consumer_group_offsets_send_request (
851
+ group_id , group_coordinator_id , partitions )
852
+ self ._wait_for_futures ([future ])
853
+ response = future .value
854
+ return self ._list_consumer_group_offsets_process_response (response )
797
855
798
856
# delete groups protocol not yet implemented
799
857
# Note: send the request to the group's coordinator.
0 commit comments