-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Console commands #119
base: master
Are you sure you want to change the base?
Console commands #119
Changes from all commits
16153fc
f853617
9e55ebc
ff47b78
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import logging | ||
from optparse import OptionParser, Option, OptionValueError | ||
|
||
from kafka import KafkaClient | ||
|
||
logging.basicConfig() | ||
|
||
# Add this attribute to easily check if the option is required | ||
Option.ATTRS.append("required") | ||
|
||
BROKER_OPTION = Option("-b", "--broker", dest="broker", required=True, | ||
help="Required: The address of a kafka broker") | ||
TOPIC_OPTION = Option("-t", "--topic", dest="topic", required=True, | ||
help="Required: The topic to consume from") | ||
|
||
def parse_options(*extra_options): | ||
parser = OptionParser() | ||
options = [BROKER_OPTION, TOPIC_OPTION] + list(extra_options) | ||
parser.add_options(options) | ||
(opts, args) = parser.parse_args() | ||
|
||
missing = [o._long_opts[0] for o in options | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we have to check for missing required options this if options are already marked as required? I haven't worked with optparse but I am pretty sure argparse does this for free. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the 'required' attr in line 9. It's not handled by optparse. I thought argparse does not support python 2.6, but now I see that there is a package for it, so I'll change this. |
||
if o.required and getattr(opts, o.dest) is None] | ||
if missing: | ||
parser.error("Missing required option(s) %s" % ", ".join(missing)) | ||
|
||
return opts | ||
|
||
def get_client(broker, client_id=KafkaClient.CLIENT_ID): | ||
try: | ||
(host, port) = broker.split(':') | ||
except ValueError: | ||
raise OptionValueError("Broker should be in the form 'host:port'") | ||
|
||
return KafkaClient(host, int(port), client_id) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
#!/usr/bin/env python | ||
|
||
from optparse import Option | ||
import sys | ||
|
||
from kafka import ConsoleConsumer | ||
from kafka._script_utils import parse_options, get_client | ||
|
||
GROUP_OPTION = Option("-g", "--group", dest="group", required=True, | ||
help="Required: The consumer group") | ||
CLIENT_ID = "kp_consumer" | ||
|
||
def main(): | ||
options = parse_options(GROUP_OPTION) | ||
client = get_client(options.broker, CLIENT_ID) | ||
consumer = ConsoleConsumer(client, options.group, options.topic, | ||
auto_commit=False) | ||
try: | ||
consumer.run() | ||
except KeyboardInterrupt: | ||
consumer.stop() | ||
finally: | ||
client.close() | ||
print("Done!") | ||
return 0 | ||
|
||
if __name__ == '__main__': | ||
sys.exit(main()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
#!/usr/bin/env python | ||
|
||
import sys | ||
|
||
from kafka.util import ensure_topic_creation | ||
from kafka._script_utils import parse_options, get_client | ||
|
||
CLIENT_ID = "kp_create_topic" | ||
|
||
def main(): | ||
options = parse_options() | ||
client = get_client(options.broker, CLIENT_ID) | ||
try: | ||
print "Creating topic %s..." % options.topic | ||
ensure_topic_creation(client, options.topic) | ||
finally: | ||
client.close() | ||
print("Done!") | ||
return 0 | ||
|
||
if __name__ == '__main__': | ||
sys.exit(main()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
#!/usr/bin/env python | ||
|
||
import sys | ||
|
||
from kafka import ConsoleProducer | ||
from kafka._script_utils import parse_options, get_client | ||
|
||
CLIENT_ID = "kp_producer" | ||
|
||
def main(): | ||
options = parse_options() | ||
client = get_client(options.broker, CLIENT_ID) | ||
producer = ConsoleProducer(client) | ||
try: | ||
producer.run(options.topic) | ||
producer.stop() | ||
except KeyboardInterrupt: | ||
producer.stop() | ||
finally: | ||
client.close() | ||
print("Done!") | ||
return 0 | ||
|
||
if __name__ == '__main__': | ||
sys.exit(main()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ def run(self): | |
cmdclass={"test": Tox}, | ||
|
||
packages=["kafka"], | ||
scripts=["scripts/kp_consumer", "scripts/kp_producer", "scripts/kp_create_topic"], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have you thought about using console_scripts entry points instead? I am (unfortunately) working on Windows during most of my work hours and scripts like these are not portable on this platform. A bat/cmd file can be added to scripts to compensate but console scripts are more portable since they create executables for us. I probably won't be using these scripts so I don't mind all that much but it may be useful to others. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did not know about console_scripts. I'll make this change |
||
|
||
author="David Arthur", | ||
author_email="[email protected]", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optparse is marked as deprecated, use argparse instead.
I believe it will be a drop in replacement for the changes you are making.