Skip to content

python -m cli interfaces for kafka.admin, kafka.consumer, kafka.producer #2650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions kafka/admin/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from __future__ import absolute_import

import sys

from kafka.cli.admin import run_cli

sys.exit(run_cli())
6 changes: 6 additions & 0 deletions kafka/admin/config_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ def __init__(
self.resource_type = resource_type
self.name = name
self.configs = configs

def __str__(self):
return "ConfigResource %s=%s" % (self.resource_type, self.name)

def __repr__(self):
return "ConfigResource(%s, %s, %s)" % (self.resource_type, self.name, self.configs)
9 changes: 2 additions & 7 deletions kafka/admin/new_topic.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import absolute_import

from kafka.errors import IllegalArgumentError


class NewTopic(object):
""" A class for new topic creation
Expand All @@ -16,17 +14,14 @@ class NewTopic(object):
topic_configs (dict of str: str): A mapping of config key
and value for the topic.
"""

def __init__(
self,
name,
num_partitions,
replication_factor,
num_partitions=-1,
replication_factor=-1,
replica_assignments=None,
topic_configs=None,
):
if not (num_partitions == -1 or replication_factor == -1) ^ (replica_assignments is None):
raise IllegalArgumentError('either num_partitions/replication_factor or replica_assignment must be specified')
self.name = name
self.num_partitions = num_partitions
self.replication_factor = replication_factor
Expand Down
Empty file added kafka/cli/__init__.py
Empty file.
149 changes: 149 additions & 0 deletions kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from __future__ import absolute_import

import argparse
import json
import logging
from pprint import pprint

from kafka.admin.client import KafkaAdminClient
from .cluster import ClusterSubCommand
from .configs import ConfigsSubCommand
from .consumer_groups import ConsumerGroupsSubCommand
from .log_dirs import LogDirsSubCommand
from .topics import TopicsSubCommand

def main_parser():
parser = argparse.ArgumentParser(
prog='python -m kafka.admin',
description='Kafka admin client',
)
parser.add_argument(
'-b', '--bootstrap-servers', type=str, action='append', required=True,
help='host:port for cluster bootstrap servers')
parser.add_argument(
'-c', '--extra-config', type=str, action='append',
help='additional configuration properties for admin client')
parser.add_argument(
'-l', '--log-level', type=str,
help='logging level, passed to logging.basicConfig')
parser.add_argument(
'-f', '--format', type=str, default='raw',
help='output format: raw|json')
return parser


_LOGGING_LEVELS = {'NOTSET': 0, 'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50}


def build_kwargs(props):
kwargs = {}
for prop in props or []:
k, v = prop.split('=')
try:
v = int(v)
except ValueError:
pass
if v == 'None':
v = None
elif v == 'False':
v = False
elif v == 'True':
v = True
kwargs[k] = v
return kwargs


def run_cli(args=None):
parser = main_parser()
subparsers = parser.add_subparsers(help='subcommands')
for cmd in [ClusterSubCommand, ConfigsSubCommand, LogDirsSubCommand,
TopicsSubCommand, ConsumerGroupsSubCommand]:
cmd.add_subparser(subparsers)

config = parser.parse_args(args)
if config.log_level:
logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()])
if config.format not in ('raw', 'json'):
raise ValueError('Unrecognized format: %s' % config.format)
logger = logging.getLogger(__name__)

kwargs = build_kwargs(config.extra_config)
client = KafkaAdminClient(bootstrap_servers=config.bootstrap_servers, **kwargs)
try:
result = config.command(client, config)
if config.format == 'raw':
pprint(result)
elif config.format == 'json':
print(json.dumps(result))
return 0
except AttributeError:
parser.print_help()
return 2
except Exception:
logger.exception('Error!')
return 1

if __name__ == '__main__':
import sys
sys.exit(run_cli())


# Commands TODO:
# [acls]
# describe
# create
# delete

# [configs]
# alter
# IncrementalAlterConfigs (not supported yet)

# [partitions]
# create
# alter-reassignments (AlterPartitionReassignments - not supported yet)
# list-reassignments (ListPartitionReassignments - not supported yet)

# [records]
# delete

# [consumer-groups]
# remove-members (not supported yet)
# delete-offsets (not supported yet)
# alter-offsets (not supported yet)

# [offsets]
# list (not supported yet)
# delete (OffsetDelete - not supported yet)

# leader-election
# perform_leader_election

# [log-dirs]
# describe (currently broken)
# alter (AlterReplicaLogDirs - not supported yet)

# [client-quotas]
# describe (DescribeClientQuotas - not supported yet)
# alter (AlterClientQuotas - not supported yet)

# DescribeQuorum (not supported yet)

# [producers]
# describe (DescribeProducers - not supported yet)

# [transactions]
# describe (DescribeTransactions - not supported yet)
# list (ListTransactions - not supported yet)
# abort (not supported yet)

# [topics]
# describe-partitions (DescribeTopicPartitions - not supported yet)

# [cluster]
# describe-features (DescribeFeatures - not supported yet)
# update-features (UpdateFeatures - not supported yet)
# version
# api-versions



16 changes: 16 additions & 0 deletions kafka/cli/admin/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from __future__ import absolute_import

import sys

from .describe import DescribeCluster


class ClusterSubCommand:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('cluster', help='Manage Kafka Cluster')
commands = parser.add_subparsers()
for cmd in [DescribeCluster]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
9 changes: 9 additions & 0 deletions kafka/cli/admin/cluster/describe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from __future__ import absolute_import


class DescribeCluster:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('describe', help='Describe Kafka Cluster')
parser.set_defaults(command=lambda cli, _args: cli.describe_cluster())
16 changes: 16 additions & 0 deletions kafka/cli/admin/configs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from __future__ import absolute_import

import sys

from .describe import DescribeConfigs


class ConfigsSubCommand:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('configs', help='Manage Kafka Configuration')
commands = parser.add_subparsers()
for cmd in [DescribeConfigs]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
24 changes: 24 additions & 0 deletions kafka/cli/admin/configs/describe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import absolute_import

from kafka.admin.config_resource import ConfigResource


class DescribeConfigs:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('describe', help='Describe Kafka Configs')
parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[])
parser.add_argument('-b', '--broker', type=str, action='append', dest='brokers', default=[])
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
resources = []
for topic in args.topics:
resources.append(ConfigResource('TOPIC', topic))
for broker in args.brokers:
resources.append(ConfigResource('BROKER', broker))

response = client.describe_configs(resources)
return list(zip([(r.resource_type.name, r.name) for r in resources], [{str(vals[0]): vals[1] for vals in r.resources[0][4]} for r in response]))
19 changes: 19 additions & 0 deletions kafka/cli/admin/consumer_groups/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import absolute_import

import sys

from .delete import DeleteConsumerGroups
from .describe import DescribeConsumerGroups
from .list import ListConsumerGroups
from .list_offsets import ListConsumerGroupOffsets


class ConsumerGroupsSubCommand:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('consumer-groups', help='Manage Kafka Consumer Groups')
commands = parser.add_subparsers()
for cmd in [ListConsumerGroups, DescribeConsumerGroups, ListConsumerGroupOffsets, DeleteConsumerGroups]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
10 changes: 10 additions & 0 deletions kafka/cli/admin/consumer_groups/delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from __future__ import absolute_import


class DeleteConsumerGroups:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('delete', help='Delete Consumer Groups')
parser.add_argument('-g', '--group-id', type=str, action='append', dest='groups', required=True)
parser.set_defaults(command=lambda cli, args: cli.delete_consumer_groups(args.groups))
10 changes: 10 additions & 0 deletions kafka/cli/admin/consumer_groups/describe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from __future__ import absolute_import


class DescribeConsumerGroups:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('describe', help='Describe Consumer Groups')
parser.add_argument('-g', '--group-id', type=str, action='append', dest='groups', required=True)
parser.set_defaults(command=lambda cli, args: cli.describe_consumer_groups(args.groups))
9 changes: 9 additions & 0 deletions kafka/cli/admin/consumer_groups/list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from __future__ import absolute_import


class ListConsumerGroups:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('list', help='List Consumer Groups')
parser.set_defaults(command=lambda cli, _args: cli.list_consumer_groups())
10 changes: 10 additions & 0 deletions kafka/cli/admin/consumer_groups/list_offsets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from __future__ import absolute_import


class ListConsumerGroupOffsets:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('list-offsets', help='List Offsets for Consumer Group')
parser.add_argument('-g', '--group-id', type=str, required=True)
parser.set_defaults(command=lambda cli, args: cli.list_consumer_group_offsets(args.group_id))
16 changes: 16 additions & 0 deletions kafka/cli/admin/log_dirs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from __future__ import absolute_import

import sys

from .describe import DescribeLogDirs


class LogDirsSubCommand:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('log-dirs', help='Manage Kafka Topic/Partition Log Directories')
commands = parser.add_subparsers()
for cmd in [DescribeLogDirs]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
9 changes: 9 additions & 0 deletions kafka/cli/admin/log_dirs/describe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from __future__ import absolute_import


class DescribeLogDirs:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('describe', help='Get topic log directories for brokers')
parser.set_defaults(command=lambda cli, _args: cli.describe_log_dirs())
19 changes: 19 additions & 0 deletions kafka/cli/admin/topics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import absolute_import

import sys

from .create import CreateTopic
from .delete import DeleteTopic
from .describe import DescribeTopics
from .list import ListTopics


class TopicsSubCommand:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('topics', help='List/Describe/Create/Delete Kafka Topics')
commands = parser.add_subparsers()
for cmd in [ListTopics, DescribeTopics, CreateTopic, DeleteTopic]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
18 changes: 18 additions & 0 deletions kafka/cli/admin/topics/create.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import absolute_import

from kafka.admin.new_topic import NewTopic


class CreateTopic:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('create', help='Create a Kafka Topic')
parser.add_argument('-t', '--topic', type=str, required=True)
parser.add_argument('--num-partitions', type=int, default=-1)
parser.add_argument('--replication-factor', type=int, default=-1)
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
return client.create_topics([NewTopic(args.topic, args.num_partitions, args.replication_factor)])
Loading
Loading