From f61418c68df17eb71b6347c05e453b396b1e1533 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 20 Jun 2025 15:45:33 -0700 Subject: [PATCH 1/8] kafka.admin cli --- kafka/admin/__main__.py | 7 ++ kafka/admin/config_resource.py | 6 ++ kafka/admin/new_topic.py | 9 +-- kafka/cli/__init__.py | 0 kafka/cli/admin/__init__.py | 109 +++++++++++++++++++++++++++ kafka/cli/admin/create_topic.py | 18 +++++ kafka/cli/admin/delete_topic.py | 10 +++ kafka/cli/admin/describe_cluster.py | 9 +++ kafka/cli/admin/describe_configs.py | 24 ++++++ kafka/cli/admin/describe_log_dirs.py | 9 +++ kafka/cli/admin/describe_topics.py | 10 +++ kafka/cli/admin/list_topics.py | 9 +++ 12 files changed, 213 insertions(+), 7 deletions(-) create mode 100644 kafka/admin/__main__.py create mode 100644 kafka/cli/__init__.py create mode 100644 kafka/cli/admin/__init__.py create mode 100644 kafka/cli/admin/create_topic.py create mode 100644 kafka/cli/admin/delete_topic.py create mode 100644 kafka/cli/admin/describe_cluster.py create mode 100644 kafka/cli/admin/describe_configs.py create mode 100644 kafka/cli/admin/describe_log_dirs.py create mode 100644 kafka/cli/admin/describe_topics.py create mode 100644 kafka/cli/admin/list_topics.py diff --git a/kafka/admin/__main__.py b/kafka/admin/__main__.py new file mode 100644 index 000000000..776063869 --- /dev/null +++ b/kafka/admin/__main__.py @@ -0,0 +1,7 @@ +from __future__ import absolute_import + +import sys + +from kafka.cli.admin import run_cli + +sys.exit(run_cli()) diff --git a/kafka/admin/config_resource.py b/kafka/admin/config_resource.py index e3294c9c4..06754ba9f 100644 --- a/kafka/admin/config_resource.py +++ b/kafka/admin/config_resource.py @@ -34,3 +34,9 @@ def __init__( self.resource_type = resource_type self.name = name self.configs = configs + + def __str__(self): + return "ConfigResource %s=%s" % (self.resource_type, self.name) + + def __repr__(self): + return "ConfigResource(%s, %s, %s)" % (self.resource_type, self.name, self.configs) diff --git a/kafka/admin/new_topic.py b/kafka/admin/new_topic.py index 645ac383a..e43c52226 100644 --- a/kafka/admin/new_topic.py +++ b/kafka/admin/new_topic.py @@ -1,7 +1,5 @@ from __future__ import absolute_import -from kafka.errors import IllegalArgumentError - class NewTopic(object): """ A class for new topic creation @@ -16,17 +14,14 @@ class NewTopic(object): topic_configs (dict of str: str): A mapping of config key and value for the topic. """ - def __init__( self, name, - num_partitions, - replication_factor, + num_partitions=-1, + replication_factor=-1, replica_assignments=None, topic_configs=None, ): - if not (num_partitions == -1 or replication_factor == -1) ^ (replica_assignments is None): - raise IllegalArgumentError('either num_partitions/replication_factor or replica_assignment must be specified') self.name = name self.num_partitions = num_partitions self.replication_factor = replication_factor diff --git a/kafka/cli/__init__.py b/kafka/cli/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py new file mode 100644 index 000000000..71dcfce1f --- /dev/null +++ b/kafka/cli/admin/__init__.py @@ -0,0 +1,109 @@ +from __future__ import absolute_import + +import argparse +import json +import logging +from pprint import pprint + +from kafka.admin.client import KafkaAdminClient +from .describe_cluster import DescribeCluster +from .describe_log_dirs import DescribeLogDirs +from .create_topic import CreateTopic +from .delete_topic import DeleteTopic +from .describe_topics import DescribeTopics +from .list_topics import ListTopics +from .describe_configs import DescribeConfigs + + # describe_acls + # create_acls + # delete_acls + + # alter_configs + # IncrementalAlterConfigs (not supported yet) + + # create_partitions + # AlterPartitionReassignments (not supported yet) + # ListPartitionReassignments (not supported yet) + + # delete_records + # OffsetDelete (not supported yet) + + # describe_consumer_groups + # list_consumer_groups + # list_consumer_group_offsets + # delete_consumer_groups + # delete_consumer_group_offsets (not supported yet) + # remove_members_from_consumer_group (not supported yet) + # alter_consumer_group_offsets (not supported yet) + + # list_offsets (not supported yet) + + # perform_leader_election + + # describe_log_dirs (currently broken) + # AlterReplicaLogDirs (not supported yet) + + # DescribeClientQuotas (not supported yet) + # AlterClientQuotas (not supported yet) + + # DescribeQuorum (not supported yet) + + # DescribeProducers (not supported yet) + # DescribeTransactions (not supported yet) + # ListTransactions (not supported yet) + # abort_transactin (not supported yet) + + # DescribeTopicPartitions (not supported yet) + # DescribeFeatures (not supported yet) + # UpdateFeatures (not supported yet) + + # api_versions + + +def main_parser(): + parser = argparse.ArgumentParser( + prog='kafka-admin-client', + description='Kafka admin client', + ) + parser.add_argument( + '-b', '--bootstrap-servers', type=str, action='append', required=True, + help='host:port for cluster bootstrap servers') + parser.add_argument( + '-l', '--log-level', type=str, + help='logging level, passed to logging.basicConfig') + parser.add_argument( + '-f', '--format', type=str, default='raw', + help='output format: raw|json') + return parser + + +_LOGGING_LEVELS = {'NOTSET': 0, 'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50} + + +def run_cli(args=None): + parser = main_parser() + subparsers = parser.add_subparsers(help='subcommands') + for cmd in [DescribeCluster, DescribeConfigs, DescribeLogDirs, ListTopics, DescribeTopics, CreateTopic, DeleteTopic]: + cmd.add_subparser(subparsers) + + config = parser.parse_args(args) + if config.log_level: + logging.basicConfig(level=_LOGGING_LEVELS[config.log_level]) + if config.format not in ('raw', 'json'): + raise ValueError('Unrecognized format: %s' % config.format) + + client = KafkaAdminClient(bootstrap_servers=config.bootstrap_servers) + try: + result = config.command(client, config) + if config.format == 'raw': + pprint(result) + elif config.format == 'json': + print(json.dumps(result)) + return 0 + except Exception: + logging.exception('Error!') + return 1 + +if __name__ == '__main__': + import sys + sys.exit(run_cli()) diff --git a/kafka/cli/admin/create_topic.py b/kafka/cli/admin/create_topic.py new file mode 100644 index 000000000..4df6fe703 --- /dev/null +++ b/kafka/cli/admin/create_topic.py @@ -0,0 +1,18 @@ +from __future__ import absolute_import + +from kafka.admin.new_topic import NewTopic + + +class CreateTopic: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('create-topic', help='Create a Kafka Topic') + parser.add_argument('-t', '--topic', type=str, required=True) + parser.add_argument('--num-partitions', type=int, default=-1) + parser.add_argument('--replication-factor', type=int, default=-1) + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + return client.create_topics([NewTopic(args.topic, args.num_partitions, args.replication_factor)]) diff --git a/kafka/cli/admin/delete_topic.py b/kafka/cli/admin/delete_topic.py new file mode 100644 index 000000000..2ebfc4ad6 --- /dev/null +++ b/kafka/cli/admin/delete_topic.py @@ -0,0 +1,10 @@ +from __future__ import absolute_import + + +class DeleteTopic: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('delete-topic', help='Delete Kafka Topic') + parser.add_argument('-t', '--topic', type=str, required=True) + parser.set_defaults(command=lambda cli, args: cli.delete_topics([args.topic])) diff --git a/kafka/cli/admin/describe_cluster.py b/kafka/cli/admin/describe_cluster.py new file mode 100644 index 000000000..9f4b699b8 --- /dev/null +++ b/kafka/cli/admin/describe_cluster.py @@ -0,0 +1,9 @@ +from __future__ import absolute_import + + +class DescribeCluster: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('describe-cluster', help='Describe Kafka Cluster') + parser.set_defaults(command=lambda cli, _args: cli.describe_cluster()) diff --git a/kafka/cli/admin/describe_configs.py b/kafka/cli/admin/describe_configs.py new file mode 100644 index 000000000..2d9d714dc --- /dev/null +++ b/kafka/cli/admin/describe_configs.py @@ -0,0 +1,24 @@ +from __future__ import absolute_import + +from kafka.admin.config_resource import ConfigResource + + +class DescribeConfigs: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('describe-configs', help='Describe Kafka Configs') + parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[]) + parser.add_argument('-b', '--broker', type=str, action='append', dest='brokers', default=[]) + parser.set_defaults(command=cls.command) + + @classmethod + def command(cls, client, args): + resources = [] + for topic in args.topics: + resources.append(ConfigResource('TOPIC', topic)) + for broker in args.brokers: + resources.append(ConfigResource('BROKER', broker)) + + response = client.describe_configs(resources) + return list(zip(resources, [{str(vals[0]): vals[1] for vals in r.resources[0][4]} for r in response])) diff --git a/kafka/cli/admin/describe_log_dirs.py b/kafka/cli/admin/describe_log_dirs.py new file mode 100644 index 000000000..873bc87cc --- /dev/null +++ b/kafka/cli/admin/describe_log_dirs.py @@ -0,0 +1,9 @@ +from __future__ import absolute_import + + +class DescribeLogDirs: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('describe-log-dirs', help='Get topic log directories for brokers') + parser.set_defaults(command=lambda cli, _args: cli.describe_log_dirs()) diff --git a/kafka/cli/admin/describe_topics.py b/kafka/cli/admin/describe_topics.py new file mode 100644 index 000000000..e6fedcd36 --- /dev/null +++ b/kafka/cli/admin/describe_topics.py @@ -0,0 +1,10 @@ +from __future__ import absolute_import + + +class DescribeTopics: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('describe-topics', help='Describe Kafka Topics') + parser.add_argument('-t', '--topic', type=str, action='append', dest='topics') + parser.set_defaults(command=lambda cli, args: cli.describe_topics(args.topics or None)) diff --git a/kafka/cli/admin/list_topics.py b/kafka/cli/admin/list_topics.py new file mode 100644 index 000000000..60b32db5a --- /dev/null +++ b/kafka/cli/admin/list_topics.py @@ -0,0 +1,9 @@ +from __future__ import absolute_import + + +class ListTopics: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('list-topics', help='List Kafka Topics') + parser.set_defaults(command=lambda cli, _args: cli.list_topics()) From 2dfed325cabe3ffbaa471e51023b1fd5d8187c3e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 23 Jun 2025 11:59:53 -0700 Subject: [PATCH 2/8] break into subcommand dirs --- kafka/cli/admin/__init__.py | 130 ++++++++++-------- kafka/cli/admin/cluster/__init__.py | 16 +++ .../describe.py} | 2 +- kafka/cli/admin/configs/__init__.py | 16 +++ .../describe.py} | 4 +- kafka/cli/admin/consumer_groups/__init__.py | 16 +++ kafka/cli/admin/consumer_groups/list.py | 9 ++ kafka/cli/admin/log_dirs/__init__.py | 16 +++ .../describe.py} | 2 +- kafka/cli/admin/topics/__init__.py | 19 +++ .../{create_topic.py => topics/create.py} | 2 +- .../{delete_topic.py => topics/delete.py} | 2 +- .../describe.py} | 2 +- .../admin/{list_topics.py => topics/list.py} | 2 +- 14 files changed, 175 insertions(+), 63 deletions(-) create mode 100644 kafka/cli/admin/cluster/__init__.py rename kafka/cli/admin/{describe_cluster.py => cluster/describe.py} (69%) create mode 100644 kafka/cli/admin/configs/__init__.py rename kafka/cli/admin/{describe_configs.py => configs/describe.py} (76%) create mode 100644 kafka/cli/admin/consumer_groups/__init__.py create mode 100644 kafka/cli/admin/consumer_groups/list.py create mode 100644 kafka/cli/admin/log_dirs/__init__.py rename kafka/cli/admin/{describe_log_dirs.py => log_dirs/describe.py} (65%) create mode 100644 kafka/cli/admin/topics/__init__.py rename kafka/cli/admin/{create_topic.py => topics/create.py} (87%) rename kafka/cli/admin/{delete_topic.py => topics/delete.py} (77%) rename kafka/cli/admin/{describe_topics.py => topics/describe.py} (77%) rename kafka/cli/admin/{list_topics.py => topics/list.py} (70%) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 71dcfce1f..5ba311078 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -6,63 +6,15 @@ from pprint import pprint from kafka.admin.client import KafkaAdminClient -from .describe_cluster import DescribeCluster -from .describe_log_dirs import DescribeLogDirs -from .create_topic import CreateTopic -from .delete_topic import DeleteTopic -from .describe_topics import DescribeTopics -from .list_topics import ListTopics -from .describe_configs import DescribeConfigs - - # describe_acls - # create_acls - # delete_acls - - # alter_configs - # IncrementalAlterConfigs (not supported yet) - - # create_partitions - # AlterPartitionReassignments (not supported yet) - # ListPartitionReassignments (not supported yet) - - # delete_records - # OffsetDelete (not supported yet) - - # describe_consumer_groups - # list_consumer_groups - # list_consumer_group_offsets - # delete_consumer_groups - # delete_consumer_group_offsets (not supported yet) - # remove_members_from_consumer_group (not supported yet) - # alter_consumer_group_offsets (not supported yet) - - # list_offsets (not supported yet) - - # perform_leader_election - - # describe_log_dirs (currently broken) - # AlterReplicaLogDirs (not supported yet) - - # DescribeClientQuotas (not supported yet) - # AlterClientQuotas (not supported yet) - - # DescribeQuorum (not supported yet) - - # DescribeProducers (not supported yet) - # DescribeTransactions (not supported yet) - # ListTransactions (not supported yet) - # abort_transactin (not supported yet) - - # DescribeTopicPartitions (not supported yet) - # DescribeFeatures (not supported yet) - # UpdateFeatures (not supported yet) - - # api_versions - +from .cluster import ClusterSubCommand +from .configs import ConfigsSubCommand +from .consumer_groups import ConsumerGroupsSubCommand +from .log_dirs import LogDirsSubCommand +from .topics import TopicsSubCommand def main_parser(): parser = argparse.ArgumentParser( - prog='kafka-admin-client', + prog='python -m kafka.admin', description='Kafka admin client', ) parser.add_argument( @@ -83,7 +35,8 @@ def main_parser(): def run_cli(args=None): parser = main_parser() subparsers = parser.add_subparsers(help='subcommands') - for cmd in [DescribeCluster, DescribeConfigs, DescribeLogDirs, ListTopics, DescribeTopics, CreateTopic, DeleteTopic]: + for cmd in [ClusterSubCommand, ConfigsSubCommand, LogDirsSubCommand, + TopicsSubCommand, ConsumerGroupsSubCommand]: cmd.add_subparser(subparsers) config = parser.parse_args(args) @@ -100,6 +53,9 @@ def run_cli(args=None): elif config.format == 'json': print(json.dumps(result)) return 0 + except AttributeError: + parser.print_help() + return 2 except Exception: logging.exception('Error!') return 1 @@ -107,3 +63,67 @@ def run_cli(args=None): if __name__ == '__main__': import sys sys.exit(run_cli()) + + +# Commands TODO: + # [acls] + # describe + # create + # delete + + # [configs] + # alter + # IncrementalAlterConfigs (not supported yet) + + # [partitions] + # create + # alter-reassignments (AlterPartitionReassignments - not supported yet) + # list-reassignments (ListPartitionReassignments - not supported yet) + + # records + # delete_records + + # [consumer-groups] + # describe + # delete + # remove-members (not supported yet) + # list-offsets + # delete-offsets (not supported yet) + # alter-offsets (not supported yet) + + # [offsets] + # list (not supported yet) + # delete (OffsetDelete - not supported yet) + + # leader-election + # perform_leader_election + + # [log-dirs] + # describe (currently broken) + # alter (AlterReplicaLogDirs - not supported yet) + + # [client-quotas] + # describe (DescribeClientQuotas - not supported yet) + # alter (AlterClientQuotas - not supported yet) + + # DescribeQuorum (not supported yet) + + # [producers] + # describe (DescribeProducers - not supported yet) + + # [transactions] + # describe (DescribeTransactions - not supported yet) + # list (ListTransactions - not supported yet) + # abort (not supported yet) + + # [topics] + # describe-partitions (DescribeTopicPartitions - not supported yet) + + # [cluster] + # describe-features (DescribeFeatures - not supported yet) + # update-features (UpdateFeatures - not supported yet) + # version + # api-versions + + + diff --git a/kafka/cli/admin/cluster/__init__.py b/kafka/cli/admin/cluster/__init__.py new file mode 100644 index 000000000..735228565 --- /dev/null +++ b/kafka/cli/admin/cluster/__init__.py @@ -0,0 +1,16 @@ +from __future__ import absolute_import + +import sys + +from .describe import DescribeCluster + + +class ClusterSubCommand: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('cluster', help='Manage Kafka Cluster') + commands = parser.add_subparsers() + for cmd in [DescribeCluster]: + cmd.add_subparser(commands) + parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/describe_cluster.py b/kafka/cli/admin/cluster/describe.py similarity index 69% rename from kafka/cli/admin/describe_cluster.py rename to kafka/cli/admin/cluster/describe.py index 9f4b699b8..6a2ff06e5 100644 --- a/kafka/cli/admin/describe_cluster.py +++ b/kafka/cli/admin/cluster/describe.py @@ -5,5 +5,5 @@ class DescribeCluster: @classmethod def add_subparser(cls, subparsers): - parser = subparsers.add_parser('describe-cluster', help='Describe Kafka Cluster') + parser = subparsers.add_parser('describe', help='Describe Kafka Cluster') parser.set_defaults(command=lambda cli, _args: cli.describe_cluster()) diff --git a/kafka/cli/admin/configs/__init__.py b/kafka/cli/admin/configs/__init__.py new file mode 100644 index 000000000..7ec6d1042 --- /dev/null +++ b/kafka/cli/admin/configs/__init__.py @@ -0,0 +1,16 @@ +from __future__ import absolute_import + +import sys + +from .describe import DescribeConfigs + + +class ConfigsSubCommand: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('configs', help='Manage Kafka Configuration') + commands = parser.add_subparsers() + for cmd in [DescribeConfigs]: + cmd.add_subparser(commands) + parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/describe_configs.py b/kafka/cli/admin/configs/describe.py similarity index 76% rename from kafka/cli/admin/describe_configs.py rename to kafka/cli/admin/configs/describe.py index 2d9d714dc..3ff366667 100644 --- a/kafka/cli/admin/describe_configs.py +++ b/kafka/cli/admin/configs/describe.py @@ -7,7 +7,7 @@ class DescribeConfigs: @classmethod def add_subparser(cls, subparsers): - parser = subparsers.add_parser('describe-configs', help='Describe Kafka Configs') + parser = subparsers.add_parser('describe', help='Describe Kafka Configs') parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[]) parser.add_argument('-b', '--broker', type=str, action='append', dest='brokers', default=[]) parser.set_defaults(command=cls.command) @@ -21,4 +21,4 @@ def command(cls, client, args): resources.append(ConfigResource('BROKER', broker)) response = client.describe_configs(resources) - return list(zip(resources, [{str(vals[0]): vals[1] for vals in r.resources[0][4]} for r in response])) + return list(zip([(r.resource_type.name, r.name) for r in resources], [{str(vals[0]): vals[1] for vals in r.resources[0][4]} for r in response])) diff --git a/kafka/cli/admin/consumer_groups/__init__.py b/kafka/cli/admin/consumer_groups/__init__.py new file mode 100644 index 000000000..a1bec6c19 --- /dev/null +++ b/kafka/cli/admin/consumer_groups/__init__.py @@ -0,0 +1,16 @@ +from __future__ import absolute_import + +import sys + +from .list import ListConsumerGroups + + +class ConsumerGroupsSubCommand: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('consumer-groups', help='Manage Kafka Consumer Groups') + commands = parser.add_subparsers() + for cmd in [ListConsumerGroups]: + cmd.add_subparser(commands) + parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/consumer_groups/list.py b/kafka/cli/admin/consumer_groups/list.py new file mode 100644 index 000000000..6c02f3bee --- /dev/null +++ b/kafka/cli/admin/consumer_groups/list.py @@ -0,0 +1,9 @@ +from __future__ import absolute_import + + +class ListConsumerGroups: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('list', help='List Consumer Groups') + parser.set_defaults(command=lambda cli, _args: cli.list_consumer_groups()) diff --git a/kafka/cli/admin/log_dirs/__init__.py b/kafka/cli/admin/log_dirs/__init__.py new file mode 100644 index 000000000..3a4a94351 --- /dev/null +++ b/kafka/cli/admin/log_dirs/__init__.py @@ -0,0 +1,16 @@ +from __future__ import absolute_import + +import sys + +from .describe import DescribeLogDirs + + +class LogDirsSubCommand: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('log-dirs', help='Manage Kafka Topic/Partition Log Directories') + commands = parser.add_subparsers() + for cmd in [DescribeLogDirs]: + cmd.add_subparser(commands) + parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/describe_log_dirs.py b/kafka/cli/admin/log_dirs/describe.py similarity index 65% rename from kafka/cli/admin/describe_log_dirs.py rename to kafka/cli/admin/log_dirs/describe.py index 873bc87cc..a0c301f82 100644 --- a/kafka/cli/admin/describe_log_dirs.py +++ b/kafka/cli/admin/log_dirs/describe.py @@ -5,5 +5,5 @@ class DescribeLogDirs: @classmethod def add_subparser(cls, subparsers): - parser = subparsers.add_parser('describe-log-dirs', help='Get topic log directories for brokers') + parser = subparsers.add_parser('describe', help='Get topic log directories for brokers') parser.set_defaults(command=lambda cli, _args: cli.describe_log_dirs()) diff --git a/kafka/cli/admin/topics/__init__.py b/kafka/cli/admin/topics/__init__.py new file mode 100644 index 000000000..dbf569445 --- /dev/null +++ b/kafka/cli/admin/topics/__init__.py @@ -0,0 +1,19 @@ +from __future__ import absolute_import + +import sys + +from .create import CreateTopic +from .delete import DeleteTopic +from .describe import DescribeTopics +from .list import ListTopics + + +class TopicsSubCommand: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('topics', help='List/Describe/Create/Delete Kafka Topics') + commands = parser.add_subparsers() + for cmd in [ListTopics, DescribeTopics, CreateTopic, DeleteTopic]: + cmd.add_subparser(commands) + parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/create_topic.py b/kafka/cli/admin/topics/create.py similarity index 87% rename from kafka/cli/admin/create_topic.py rename to kafka/cli/admin/topics/create.py index 4df6fe703..d033f6b30 100644 --- a/kafka/cli/admin/create_topic.py +++ b/kafka/cli/admin/topics/create.py @@ -7,7 +7,7 @@ class CreateTopic: @classmethod def add_subparser(cls, subparsers): - parser = subparsers.add_parser('create-topic', help='Create a Kafka Topic') + parser = subparsers.add_parser('create', help='Create a Kafka Topic') parser.add_argument('-t', '--topic', type=str, required=True) parser.add_argument('--num-partitions', type=int, default=-1) parser.add_argument('--replication-factor', type=int, default=-1) diff --git a/kafka/cli/admin/delete_topic.py b/kafka/cli/admin/topics/delete.py similarity index 77% rename from kafka/cli/admin/delete_topic.py rename to kafka/cli/admin/topics/delete.py index 2ebfc4ad6..a88400ef2 100644 --- a/kafka/cli/admin/delete_topic.py +++ b/kafka/cli/admin/topics/delete.py @@ -5,6 +5,6 @@ class DeleteTopic: @classmethod def add_subparser(cls, subparsers): - parser = subparsers.add_parser('delete-topic', help='Delete Kafka Topic') + parser = subparsers.add_parser('delete', help='Delete Kafka Topic') parser.add_argument('-t', '--topic', type=str, required=True) parser.set_defaults(command=lambda cli, args: cli.delete_topics([args.topic])) diff --git a/kafka/cli/admin/describe_topics.py b/kafka/cli/admin/topics/describe.py similarity index 77% rename from kafka/cli/admin/describe_topics.py rename to kafka/cli/admin/topics/describe.py index e6fedcd36..2e96871d6 100644 --- a/kafka/cli/admin/describe_topics.py +++ b/kafka/cli/admin/topics/describe.py @@ -5,6 +5,6 @@ class DescribeTopics: @classmethod def add_subparser(cls, subparsers): - parser = subparsers.add_parser('describe-topics', help='Describe Kafka Topics') + parser = subparsers.add_parser('describe', help='Describe Kafka Topics') parser.add_argument('-t', '--topic', type=str, action='append', dest='topics') parser.set_defaults(command=lambda cli, args: cli.describe_topics(args.topics or None)) diff --git a/kafka/cli/admin/list_topics.py b/kafka/cli/admin/topics/list.py similarity index 70% rename from kafka/cli/admin/list_topics.py rename to kafka/cli/admin/topics/list.py index 60b32db5a..2dbf3828e 100644 --- a/kafka/cli/admin/list_topics.py +++ b/kafka/cli/admin/topics/list.py @@ -5,5 +5,5 @@ class ListTopics: @classmethod def add_subparser(cls, subparsers): - parser = subparsers.add_parser('list-topics', help='List Kafka Topics') + parser = subparsers.add_parser('list', help='List Kafka Topics') parser.set_defaults(command=lambda cli, _args: cli.list_topics()) From b688d80e63f3d2d76b818c378cdc895df2b31887 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 23 Jun 2025 12:47:16 -0700 Subject: [PATCH 3/8] Fill out consumer-groups commands --- kafka/cli/admin/consumer_groups/__init__.py | 5 ++++- kafka/cli/admin/consumer_groups/delete.py | 10 ++++++++++ kafka/cli/admin/consumer_groups/describe.py | 10 ++++++++++ kafka/cli/admin/consumer_groups/list_offsets.py | 10 ++++++++++ 4 files changed, 34 insertions(+), 1 deletion(-) create mode 100644 kafka/cli/admin/consumer_groups/delete.py create mode 100644 kafka/cli/admin/consumer_groups/describe.py create mode 100644 kafka/cli/admin/consumer_groups/list_offsets.py diff --git a/kafka/cli/admin/consumer_groups/__init__.py b/kafka/cli/admin/consumer_groups/__init__.py index a1bec6c19..cfb1bdb4f 100644 --- a/kafka/cli/admin/consumer_groups/__init__.py +++ b/kafka/cli/admin/consumer_groups/__init__.py @@ -2,7 +2,10 @@ import sys +from .delete import DeleteConsumerGroups +from .describe import DescribeConsumerGroups from .list import ListConsumerGroups +from .list_offsets import ListConsumerGroupOffsets class ConsumerGroupsSubCommand: @@ -11,6 +14,6 @@ class ConsumerGroupsSubCommand: def add_subparser(cls, subparsers): parser = subparsers.add_parser('consumer-groups', help='Manage Kafka Consumer Groups') commands = parser.add_subparsers() - for cmd in [ListConsumerGroups]: + for cmd in [ListConsumerGroups, DescribeConsumerGroups, ListConsumerGroupOffsets, DeleteConsumerGroups]: cmd.add_subparser(commands) parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) diff --git a/kafka/cli/admin/consumer_groups/delete.py b/kafka/cli/admin/consumer_groups/delete.py new file mode 100644 index 000000000..5724ae551 --- /dev/null +++ b/kafka/cli/admin/consumer_groups/delete.py @@ -0,0 +1,10 @@ +from __future__ import absolute_import + + +class DeleteConsumerGroups: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('delete', help='Delete Consumer Groups') + parser.add_argument('-g', '--group-id', type=str, action='append', dest='groups', required=True) + parser.set_defaults(command=lambda cli, args: cli.delete_consumer_groups(args.groups)) diff --git a/kafka/cli/admin/consumer_groups/describe.py b/kafka/cli/admin/consumer_groups/describe.py new file mode 100644 index 000000000..02298e9a4 --- /dev/null +++ b/kafka/cli/admin/consumer_groups/describe.py @@ -0,0 +1,10 @@ +from __future__ import absolute_import + + +class DescribeConsumerGroups: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('describe', help='Describe Consumer Groups') + parser.add_argument('-g', '--group-id', type=str, action='append', dest='groups', required=True) + parser.set_defaults(command=lambda cli, args: cli.describe_consumer_groups(args.groups)) diff --git a/kafka/cli/admin/consumer_groups/list_offsets.py b/kafka/cli/admin/consumer_groups/list_offsets.py new file mode 100644 index 000000000..7c05c5aae --- /dev/null +++ b/kafka/cli/admin/consumer_groups/list_offsets.py @@ -0,0 +1,10 @@ +from __future__ import absolute_import + + +class ListConsumerGroupOffsets: + + @classmethod + def add_subparser(cls, subparsers): + parser = subparsers.add_parser('list-offsets', help='List Offsets for Consumer Group') + parser.add_argument('-g', '--group-id', type=str, required=True) + parser.set_defaults(command=lambda cli, args: cli.list_consumer_group_offsets(args.group_id)) From 22e88eb9310712a8d3f2e16aa22f62849f703545 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 23 Jun 2025 12:55:10 -0700 Subject: [PATCH 4/8] update todos; upper log_level --- kafka/cli/admin/__init__.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 5ba311078..af430d701 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -41,7 +41,7 @@ def run_cli(args=None): config = parser.parse_args(args) if config.log_level: - logging.basicConfig(level=_LOGGING_LEVELS[config.log_level]) + logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()]) if config.format not in ('raw', 'json'): raise ValueError('Unrecognized format: %s' % config.format) @@ -80,14 +80,11 @@ def run_cli(args=None): # alter-reassignments (AlterPartitionReassignments - not supported yet) # list-reassignments (ListPartitionReassignments - not supported yet) - # records - # delete_records + # [records] + # delete # [consumer-groups] - # describe - # delete # remove-members (not supported yet) - # list-offsets # delete-offsets (not supported yet) # alter-offsets (not supported yet) From 242d5cf2ae9ec549f9e61ba5bf3d3ba7b6603d87 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 23 Jun 2025 13:02:41 -0700 Subject: [PATCH 5/8] --extra-config for admin-client --- kafka/cli/admin/__init__.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index af430d701..02971e333 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -15,11 +15,14 @@ def main_parser(): parser = argparse.ArgumentParser( prog='python -m kafka.admin', - description='Kafka admin client', + description='Kafka admin client', ) parser.add_argument( '-b', '--bootstrap-servers', type=str, action='append', required=True, help='host:port for cluster bootstrap servers') + parser.add_argument( + '-c', '--extra-config', type=str, action='append', + help='additional configuration properties for admin client') parser.add_argument( '-l', '--log-level', type=str, help='logging level, passed to logging.basicConfig') @@ -32,6 +35,24 @@ def main_parser(): _LOGGING_LEVELS = {'NOTSET': 0, 'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50} +def build_kwargs(props): + kwargs = {} + for prop in props or []: + k, v = prop.split('=') + try: + v = int(v) + except ValueError: + pass + if v == 'None': + v = None + elif v == 'False': + v = False + elif v == 'True': + v = True + kwargs[k] = v + return kwargs + + def run_cli(args=None): parser = main_parser() subparsers = parser.add_subparsers(help='subcommands') @@ -44,8 +65,10 @@ def run_cli(args=None): logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()]) if config.format not in ('raw', 'json'): raise ValueError('Unrecognized format: %s' % config.format) + logger = logging.getLogger(__name__) - client = KafkaAdminClient(bootstrap_servers=config.bootstrap_servers) + kwargs = build_kwargs(config.extra_config) + client = KafkaAdminClient(bootstrap_servers=config.bootstrap_servers, **kwargs) try: result = config.command(client, config) if config.format == 'raw': @@ -57,7 +80,7 @@ def run_cli(args=None): parser.print_help() return 2 except Exception: - logging.exception('Error!') + logger.exception('Error!') return 1 if __name__ == '__main__': From 0eed79c9a34d88b37526ef2b51ef71281e4301a6 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 23 Jun 2025 13:07:08 -0700 Subject: [PATCH 6/8] kafka.consumer cli --- kafka/cli/consumer/__init__.py | 90 ++++++++++++++++++++++++++++++++++ kafka/consumer/__main__.py | 7 +++ 2 files changed, 97 insertions(+) create mode 100644 kafka/cli/consumer/__init__.py create mode 100644 kafka/consumer/__main__.py diff --git a/kafka/cli/consumer/__init__.py b/kafka/cli/consumer/__init__.py new file mode 100644 index 000000000..7b1991075 --- /dev/null +++ b/kafka/cli/consumer/__init__.py @@ -0,0 +1,90 @@ +from __future__ import absolute_import, print_function + +import argparse +import logging + +from kafka import KafkaConsumer + + +def main_parser(): + parser = argparse.ArgumentParser( + prog='python -m kafka.consumer', + description='Kafka console consumer', + ) + parser.add_argument( + '-b', '--bootstrap-servers', type=str, action='append', required=True, + help='host:port for cluster bootstrap servers') + parser.add_argument( + '-t', '--topic', type=str, action='append', dest='topics', required=True, + help='subscribe to topic') + parser.add_argument( + '-g', '--group', type=str, required=True, + help='consumer group') + parser.add_argument( + '-c', '--extra-config', type=str, action='append', + help='additional configuration properties for kafka consumer') + parser.add_argument( + '-l', '--log-level', type=str, + help='logging level, passed to logging.basicConfig') + parser.add_argument( + '-f', '--format', type=str, default='str', + help='output format: str|raw|full') + parser.add_argument( + '--encoding', type=str, default='utf-8', help='encoding to use for str output decode()') + return parser + + +_LOGGING_LEVELS = {'NOTSET': 0, 'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50} + + +def build_kwargs(props): + kwargs = {} + for prop in props or []: + k, v = prop.split('=') + try: + v = int(v) + except ValueError: + pass + if v == 'None': + v = None + elif v == 'False': + v = False + elif v == 'True': + v = True + kwargs[k] = v + return kwargs + + +def run_cli(args=None): + parser = main_parser() + config = parser.parse_args(args) + if config.log_level: + logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()]) + if config.format not in ('str', 'raw', 'full'): + raise ValueError('Unrecognized format: %s' % config.format) + logger = logging.getLogger(__name__) + + kwargs = build_kwargs(config.extra_config) + consumer = KafkaConsumer(bootstrap_servers=config.bootstrap_servers, group_id=config.group, **kwargs) + consumer.subscribe(config.topics) + try: + for m in consumer: + if config.format == 'str': + print(m.value.decode(config.encoding)) + elif config.format == 'full': + print(m) + else: + print(m.value) + except KeyboardInterrupt: + logger.info('Bye!') + return 0 + except Exception: + logger.exception('Error!') + return 1 + finally: + consumer.close() + + +if __name__ == '__main__': + import sys + sys.exit(run_cli()) diff --git a/kafka/consumer/__main__.py b/kafka/consumer/__main__.py new file mode 100644 index 000000000..0356a1aae --- /dev/null +++ b/kafka/consumer/__main__.py @@ -0,0 +1,7 @@ +from __future__ import absolute_import + +import sys + +from kafka.cli.consumer import run_cli + +sys.exit(run_cli()) From 60850b923d7a94cffe28d3cbaaade5352f74bd5c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 23 Jun 2025 13:42:11 -0700 Subject: [PATCH 7/8] kafka.producer cli --- kafka/cli/producer/__init__.py | 85 ++++++++++++++++++++++++++++++++++ kafka/producer/__main__.py | 7 +++ 2 files changed, 92 insertions(+) create mode 100644 kafka/cli/producer/__init__.py create mode 100644 kafka/producer/__main__.py diff --git a/kafka/cli/producer/__init__.py b/kafka/cli/producer/__init__.py new file mode 100644 index 000000000..186eae1c3 --- /dev/null +++ b/kafka/cli/producer/__init__.py @@ -0,0 +1,85 @@ +from __future__ import absolute_import, print_function + +import argparse +import logging + +from kafka import KafkaProducer + + +def main_parser(): + parser = argparse.ArgumentParser( + prog='python -m kafka.producer', + description='Kafka console producer', + ) + parser.add_argument( + '-b', '--bootstrap-servers', type=str, action='append', required=True, + help='host:port for cluster bootstrap servers') + parser.add_argument( + '-t', '--topic', type=str, required=True, + help='publish to topic') + parser.add_argument( + '-c', '--extra-config', type=str, action='append', + help='additional configuration properties for kafka producer') + parser.add_argument( + '-l', '--log-level', type=str, + help='logging level, passed to logging.basicConfig') + parser.add_argument( + '--encoding', type=str, default='utf-8', + help='byte encoding for produced messages') + return parser + + +_LOGGING_LEVELS = {'NOTSET': 0, 'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50} + + +def build_kwargs(props): + kwargs = {} + for prop in props or []: + k, v = prop.split('=') + try: + v = int(v) + except ValueError: + pass + if v == 'None': + v = None + elif v == 'False': + v = False + elif v == 'True': + v = True + kwargs[k] = v + return kwargs + + +def run_cli(args=None): + parser = main_parser() + config = parser.parse_args(args) + if config.log_level: + logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()]) + logger = logging.getLogger(__name__) + + kwargs = build_kwargs(config.extra_config) + producer = KafkaProducer(bootstrap_servers=config.bootstrap_servers, **kwargs) + + def log_result(res_or_err): + if isinstance(res_or_err, Exception): + logger.error("Error producing message", exc_info=res_or_err) + else: + logger.info("Message produced: %s", res_or_err) + + try: + while True: + value = input() + producer.send(config.topic, value=value.encode(config.encoding)).add_both(log_result) + except KeyboardInterrupt: + logger.info('Bye!') + return 0 + except Exception: + logger.exception('Error!') + return 1 + finally: + producer.close() + + +if __name__ == '__main__': + import sys + sys.exit(run_cli()) diff --git a/kafka/producer/__main__.py b/kafka/producer/__main__.py new file mode 100644 index 000000000..e5fd1b1d1 --- /dev/null +++ b/kafka/producer/__main__.py @@ -0,0 +1,7 @@ +from __future__ import absolute_import + +import sys + +from kafka.cli.producer import run_cli + +sys.exit(run_cli()) From 1daa62e7fcc1fdf157a2f906ebbdb08a8599d38d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 23 Jun 2025 14:57:31 -0700 Subject: [PATCH 8/8] fixup admin new_topic tests --- test/test_admin.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/test/test_admin.py b/test/test_admin.py index cdb74242e..e6f7937a7 100644 --- a/test/test_admin.py +++ b/test/test_admin.py @@ -58,18 +58,34 @@ def test_acl_resource(): ) def test_new_topic(): - with pytest.raises(IllegalArgumentError): - _bad_topic = kafka.admin.NewTopic('foo', -1, -1) - with pytest.raises(IllegalArgumentError): - _bad_topic = kafka.admin.NewTopic('foo', 1, -1) - with pytest.raises(IllegalArgumentError): - _bad_topic = kafka.admin.NewTopic('foo', 1, 1, {1: [1, 1, 1]}) + good_topic = kafka.admin.NewTopic('foo') + assert good_topic.name == 'foo' + assert good_topic.num_partitions == -1 + assert good_topic.replication_factor == -1 + assert good_topic.replica_assignments == {} + assert good_topic.topic_configs == {} + + good_topic = kafka.admin.NewTopic('foo', 1) + assert good_topic.name == 'foo' + assert good_topic.num_partitions == 1 + assert good_topic.replication_factor == -1 + assert good_topic.replica_assignments == {} + assert good_topic.topic_configs == {} + + good_topic = kafka.admin.NewTopic('foo', 1, 1, {1: [1, 1, 1]}) + assert good_topic.name == 'foo' + assert good_topic.num_partitions == 1 + assert good_topic.replication_factor == 1 + assert good_topic.replica_assignments == {1: [1, 1, 1]} + assert good_topic.topic_configs == {} + good_topic = kafka.admin.NewTopic('foo', 1, 2) assert good_topic.name == 'foo' assert good_topic.num_partitions == 1 assert good_topic.replication_factor == 2 assert good_topic.replica_assignments == {} assert good_topic.topic_configs == {} + good_topic = kafka.admin.NewTopic('bar', -1, -1, {1: [1, 2, 3]}, {'key': 'value'}) assert good_topic.name == 'bar' assert good_topic.num_partitions == -1