Skip to content

Commit 6f6a696

Browse files
authored
python -m cli interfaces for kafka.admin, kafka.consumer, kafka.producer (#2650)
1 parent 179c8ed commit 6f6a696

26 files changed

+589
-13
lines changed

kafka/admin/__main__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from __future__ import absolute_import
2+
3+
import sys
4+
5+
from kafka.cli.admin import run_cli
6+
7+
sys.exit(run_cli())

kafka/admin/config_resource.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,9 @@ def __init__(
3434
self.resource_type = resource_type
3535
self.name = name
3636
self.configs = configs
37+
38+
def __str__(self):
39+
return "ConfigResource %s=%s" % (self.resource_type, self.name)
40+
41+
def __repr__(self):
42+
return "ConfigResource(%s, %s, %s)" % (self.resource_type, self.name, self.configs)

kafka/admin/new_topic.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
from __future__ import absolute_import
22

3-
from kafka.errors import IllegalArgumentError
4-
53

64
class NewTopic(object):
75
""" A class for new topic creation
@@ -16,17 +14,14 @@ class NewTopic(object):
1614
topic_configs (dict of str: str): A mapping of config key
1715
and value for the topic.
1816
"""
19-
2017
def __init__(
2118
self,
2219
name,
23-
num_partitions,
24-
replication_factor,
20+
num_partitions=-1,
21+
replication_factor=-1,
2522
replica_assignments=None,
2623
topic_configs=None,
2724
):
28-
if not (num_partitions == -1 or replication_factor == -1) ^ (replica_assignments is None):
29-
raise IllegalArgumentError('either num_partitions/replication_factor or replica_assignment must be specified')
3025
self.name = name
3126
self.num_partitions = num_partitions
3227
self.replication_factor = replication_factor

kafka/cli/__init__.py

Whitespace-only changes.

kafka/cli/admin/__init__.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
from __future__ import absolute_import
2+
3+
import argparse
4+
import json
5+
import logging
6+
from pprint import pprint
7+
8+
from kafka.admin.client import KafkaAdminClient
9+
from .cluster import ClusterSubCommand
10+
from .configs import ConfigsSubCommand
11+
from .consumer_groups import ConsumerGroupsSubCommand
12+
from .log_dirs import LogDirsSubCommand
13+
from .topics import TopicsSubCommand
14+
15+
def main_parser():
16+
parser = argparse.ArgumentParser(
17+
prog='python -m kafka.admin',
18+
description='Kafka admin client',
19+
)
20+
parser.add_argument(
21+
'-b', '--bootstrap-servers', type=str, action='append', required=True,
22+
help='host:port for cluster bootstrap servers')
23+
parser.add_argument(
24+
'-c', '--extra-config', type=str, action='append',
25+
help='additional configuration properties for admin client')
26+
parser.add_argument(
27+
'-l', '--log-level', type=str,
28+
help='logging level, passed to logging.basicConfig')
29+
parser.add_argument(
30+
'-f', '--format', type=str, default='raw',
31+
help='output format: raw|json')
32+
return parser
33+
34+
35+
_LOGGING_LEVELS = {'NOTSET': 0, 'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50}
36+
37+
38+
def build_kwargs(props):
39+
kwargs = {}
40+
for prop in props or []:
41+
k, v = prop.split('=')
42+
try:
43+
v = int(v)
44+
except ValueError:
45+
pass
46+
if v == 'None':
47+
v = None
48+
elif v == 'False':
49+
v = False
50+
elif v == 'True':
51+
v = True
52+
kwargs[k] = v
53+
return kwargs
54+
55+
56+
def run_cli(args=None):
57+
parser = main_parser()
58+
subparsers = parser.add_subparsers(help='subcommands')
59+
for cmd in [ClusterSubCommand, ConfigsSubCommand, LogDirsSubCommand,
60+
TopicsSubCommand, ConsumerGroupsSubCommand]:
61+
cmd.add_subparser(subparsers)
62+
63+
config = parser.parse_args(args)
64+
if config.log_level:
65+
logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()])
66+
if config.format not in ('raw', 'json'):
67+
raise ValueError('Unrecognized format: %s' % config.format)
68+
logger = logging.getLogger(__name__)
69+
70+
kwargs = build_kwargs(config.extra_config)
71+
client = KafkaAdminClient(bootstrap_servers=config.bootstrap_servers, **kwargs)
72+
try:
73+
result = config.command(client, config)
74+
if config.format == 'raw':
75+
pprint(result)
76+
elif config.format == 'json':
77+
print(json.dumps(result))
78+
return 0
79+
except AttributeError:
80+
parser.print_help()
81+
return 2
82+
except Exception:
83+
logger.exception('Error!')
84+
return 1
85+
86+
if __name__ == '__main__':
87+
import sys
88+
sys.exit(run_cli())
89+
90+
91+
# Commands TODO:
92+
# [acls]
93+
# describe
94+
# create
95+
# delete
96+
97+
# [configs]
98+
# alter
99+
# IncrementalAlterConfigs (not supported yet)
100+
101+
# [partitions]
102+
# create
103+
# alter-reassignments (AlterPartitionReassignments - not supported yet)
104+
# list-reassignments (ListPartitionReassignments - not supported yet)
105+
106+
# [records]
107+
# delete
108+
109+
# [consumer-groups]
110+
# remove-members (not supported yet)
111+
# delete-offsets (not supported yet)
112+
# alter-offsets (not supported yet)
113+
114+
# [offsets]
115+
# list (not supported yet)
116+
# delete (OffsetDelete - not supported yet)
117+
118+
# leader-election
119+
# perform_leader_election
120+
121+
# [log-dirs]
122+
# describe (currently broken)
123+
# alter (AlterReplicaLogDirs - not supported yet)
124+
125+
# [client-quotas]
126+
# describe (DescribeClientQuotas - not supported yet)
127+
# alter (AlterClientQuotas - not supported yet)
128+
129+
# DescribeQuorum (not supported yet)
130+
131+
# [producers]
132+
# describe (DescribeProducers - not supported yet)
133+
134+
# [transactions]
135+
# describe (DescribeTransactions - not supported yet)
136+
# list (ListTransactions - not supported yet)
137+
# abort (not supported yet)
138+
139+
# [topics]
140+
# describe-partitions (DescribeTopicPartitions - not supported yet)
141+
142+
# [cluster]
143+
# describe-features (DescribeFeatures - not supported yet)
144+
# update-features (UpdateFeatures - not supported yet)
145+
# version
146+
# api-versions
147+
148+
149+

kafka/cli/admin/cluster/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from __future__ import absolute_import
2+
3+
import sys
4+
5+
from .describe import DescribeCluster
6+
7+
8+
class ClusterSubCommand:
9+
10+
@classmethod
11+
def add_subparser(cls, subparsers):
12+
parser = subparsers.add_parser('cluster', help='Manage Kafka Cluster')
13+
commands = parser.add_subparsers()
14+
for cmd in [DescribeCluster]:
15+
cmd.add_subparser(commands)
16+
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))

kafka/cli/admin/cluster/describe.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from __future__ import absolute_import
2+
3+
4+
class DescribeCluster:
5+
6+
@classmethod
7+
def add_subparser(cls, subparsers):
8+
parser = subparsers.add_parser('describe', help='Describe Kafka Cluster')
9+
parser.set_defaults(command=lambda cli, _args: cli.describe_cluster())

kafka/cli/admin/configs/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from __future__ import absolute_import
2+
3+
import sys
4+
5+
from .describe import DescribeConfigs
6+
7+
8+
class ConfigsSubCommand:
9+
10+
@classmethod
11+
def add_subparser(cls, subparsers):
12+
parser = subparsers.add_parser('configs', help='Manage Kafka Configuration')
13+
commands = parser.add_subparsers()
14+
for cmd in [DescribeConfigs]:
15+
cmd.add_subparser(commands)
16+
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))

kafka/cli/admin/configs/describe.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.admin.config_resource import ConfigResource
4+
5+
6+
class DescribeConfigs:
7+
8+
@classmethod
9+
def add_subparser(cls, subparsers):
10+
parser = subparsers.add_parser('describe', help='Describe Kafka Configs')
11+
parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[])
12+
parser.add_argument('-b', '--broker', type=str, action='append', dest='brokers', default=[])
13+
parser.set_defaults(command=cls.command)
14+
15+
@classmethod
16+
def command(cls, client, args):
17+
resources = []
18+
for topic in args.topics:
19+
resources.append(ConfigResource('TOPIC', topic))
20+
for broker in args.brokers:
21+
resources.append(ConfigResource('BROKER', broker))
22+
23+
response = client.describe_configs(resources)
24+
return list(zip([(r.resource_type.name, r.name) for r in resources], [{str(vals[0]): vals[1] for vals in r.resources[0][4]} for r in response]))
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from __future__ import absolute_import
2+
3+
import sys
4+
5+
from .delete import DeleteConsumerGroups
6+
from .describe import DescribeConsumerGroups
7+
from .list import ListConsumerGroups
8+
from .list_offsets import ListConsumerGroupOffsets
9+
10+
11+
class ConsumerGroupsSubCommand:
12+
13+
@classmethod
14+
def add_subparser(cls, subparsers):
15+
parser = subparsers.add_parser('consumer-groups', help='Manage Kafka Consumer Groups')
16+
commands = parser.add_subparsers()
17+
for cmd in [ListConsumerGroups, DescribeConsumerGroups, ListConsumerGroupOffsets, DeleteConsumerGroups]:
18+
cmd.add_subparser(commands)
19+
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))

0 commit comments

Comments
 (0)