From 16153fcbec84a7f44da7daebcc56f102c7fcdbed Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Tue, 4 Feb 2014 13:42:29 -0800 Subject: [PATCH 1/4] Add console commands for producing, consuming and creating topics --- example.py | 4 ++-- kafka/__init__.py | 12 ++++++------ kafka/_command_utils.py | 25 +++++++++++++++++++++++++ kafka/consumer.py | 6 ++++++ kafka/producer.py | 10 ++++++++++ kafka/util.py | 13 +++++++++++++ scripts/kp_consumer | 27 +++++++++++++++++++++++++++ scripts/kp_create_topic | 22 ++++++++++++++++++++++ scripts/kp_producer | 25 +++++++++++++++++++++++++ setup.py | 1 + test/test_integration.py | 15 +-------------- 11 files changed, 138 insertions(+), 22 deletions(-) create mode 100644 kafka/_command_utils.py create mode 100755 scripts/kp_consumer create mode 100755 scripts/kp_create_topic create mode 100755 scripts/kp_producer diff --git a/example.py b/example.py index 3a2dc928b..846feba7f 100644 --- a/example.py +++ b/example.py @@ -5,8 +5,8 @@ from kafka.producer import SimpleProducer def produce_example(client): - producer = SimpleProducer(client, "my-topic") - producer.send_messages("test") + producer = SimpleProducer(client) + producer.send_messages("my-topic", "test") def consume_example(client): consumer = SimpleConsumer(client, "test-group", "my-topic") diff --git a/kafka/__init__.py b/kafka/__init__.py index 73aa7603c..1584c4792 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -9,13 +9,13 @@ from kafka.protocol import ( create_message, create_gzip_message, create_snappy_message ) -from kafka.producer import SimpleProducer, KeyedProducer -from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner -from kafka.consumer import SimpleConsumer, MultiProcessConsumer +from kafka.producer import ConsoleProducer, KeyedProducer, SimpleProducer +from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner +from kafka.consumer import ConsoleConsumer, MultiProcessConsumer, SimpleConsumer __all__ = [ 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', - 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer', - 'MultiProcessConsumer', 'create_message', 'create_gzip_message', - 'create_snappy_message' + 'ConsoleProducer', 'RoundRobinPartitioner', 'HashedPartitioner', + 'ConsoleConsumer', 'SimpleConsumer', 'MultiProcessConsumer', + 'create_message', 'create_gzip_message', 'create_snappy_message' ] diff --git a/kafka/_command_utils.py b/kafka/_command_utils.py new file mode 100644 index 000000000..6eb6a0ca5 --- /dev/null +++ b/kafka/_command_utils.py @@ -0,0 +1,25 @@ +import logging +from optparse import OptionParser, Option, OptionValueError + +from kafka import KafkaClient + +logging.basicConfig() + +BROKER_OPTION = Option("-b", "--broker", dest="broker", + help="Address of a kafka broker") +TOPIC_OPTION = Option("-t", "--topic", dest="topic", + help="The topic to consume from") + +def parse_options(*extra_options): + parser = OptionParser() + parser.add_options([BROKER_OPTION, TOPIC_OPTION] + list(extra_options)) + (opts, args) = parser.parse_args() + return opts + +def get_client(broker, client_id=KafkaClient.CLIENT_ID): + try: + (host, port) = broker.split(':') + except ValueError: + raise OptionValueError("Broker should be in the form 'host:port'") + + return KafkaClient(host, int(port), client_id) diff --git a/kafka/consumer.py b/kafka/consumer.py index 28b53ec92..d300b51ea 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -674,3 +674,9 @@ def get_messages(self, count=1, block=True, timeout=10): self._auto_commit() return messages + + +class ConsoleConsumer(SimpleConsumer): + def run(self): + for message in self: + print message.message.value diff --git a/kafka/producer.py b/kafka/producer.py index 12a293401..cf7b8003b 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -255,3 +255,13 @@ def send(self, topic, key, msg): def __repr__(self): return '' % self.async + + +class ConsoleProducer(SimpleProducer): + def run(self, topic): + import readline + while True: + try: + self.send_messages(topic, raw_input()) + except EOFError: + break diff --git a/kafka/util.py b/kafka/util.py index 54052fb03..66d4ba142 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,6 +1,7 @@ from collections import defaultdict import struct from threading import Thread, Event +from time import sleep from kafka.common import BufferUnderflowError @@ -114,3 +115,15 @@ def stop(self): self.active.set() self.thread.join(self.t + 1) self.timer = None + + +def ensure_topic_creation(client, topic): + times = 0 + while True: + times += 1 + client.load_metadata_for_topics(topic) + if client.has_metadata_for_topic(topic): + break + if times > 30: + raise RuntimeError("Unable to create topic %s" % topic) + sleep(1) diff --git a/scripts/kp_consumer b/scripts/kp_consumer new file mode 100755 index 000000000..13f737e36 --- /dev/null +++ b/scripts/kp_consumer @@ -0,0 +1,27 @@ +#!/usr/bin/env python + +from optparse import Option +import sys + +from kafka import ConsoleConsumer +from kafka._command_utils import parse_options, get_client + +GROUP_OPTION = Option("-g", "--group", dest="group", help="The consumer group") +CLIENT_ID = "kp_consumer" + +def main(): + options = parse_options(GROUP_OPTION) + client = get_client(options.broker, CLIENT_ID) + consumer = ConsoleConsumer(client, options.group, options.topic, + auto_commit=False) + try: + consumer.run() + except KeyboardInterrupt: + consumer.stop() + finally: + client.close() + print "Done!" + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/scripts/kp_create_topic b/scripts/kp_create_topic new file mode 100755 index 000000000..d505678ee --- /dev/null +++ b/scripts/kp_create_topic @@ -0,0 +1,22 @@ +#!/usr/bin/env python + +import sys + +from kafka.util import ensure_topic_creation +from kafka._command_utils import parse_options, get_client + +CLIENT_ID = "kp_create_topic" + +def main(): + options = parse_options() + client = get_client(options.broker, CLIENT_ID) + try: + print "Creating topic %s..." % options.topic + ensure_topic_creation(client, options.topic) + finally: + client.close() + print "Done!" + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/scripts/kp_producer b/scripts/kp_producer new file mode 100755 index 000000000..87f2ce389 --- /dev/null +++ b/scripts/kp_producer @@ -0,0 +1,25 @@ +#!/usr/bin/env python + +import sys + +from kafka import ConsoleProducer +from kafka._command_utils import parse_options, get_client + +CLIENT_ID = "kp_producer" + +def main(): + options = parse_options() + client = get_client(options.broker, CLIENT_ID) + producer = ConsoleProducer(client) + try: + producer.run(options.topic) + producer.stop() + except KeyboardInterrupt: + producer.stop() + finally: + client.close() + print "Done!" + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/setup.py b/setup.py index 0869fee3e..1f9ca26c5 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,7 @@ def run(self): cmdclass={"test": Tox}, packages=["kafka"], + scripts=["scripts/kp_consumer", "scripts/kp_producer", "scripts/kp_create_topic"], author="David Arthur", author_email="mumrah@gmail.com", diff --git a/test/test_integration.py b/test/test_integration.py index d0da523eb..5b317da72 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -9,6 +9,7 @@ from kafka.common import * # noqa from kafka.codec import has_gzip, has_snappy from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES +from kafka.util import ensure_topic_creation from .fixtures import ZookeeperFixture, KafkaFixture @@ -17,20 +18,6 @@ def random_string(l): return s -def ensure_topic_creation(client, topic_name): - times = 0 - while True: - times += 1 - client.load_metadata_for_topics(topic_name) - if client.has_metadata_for_topic(topic_name): - break - print "Waiting for %s topic to be created" % topic_name - time.sleep(1) - - if times > 30: - raise Exception("Unable to create topic %s" % topic_name) - - class KafkaTestCase(unittest.TestCase): def setUp(self): self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10)) From f8536170f1f5a89166fe41bfbfe38c9f6ac80c5c Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Tue, 4 Feb 2014 14:19:42 -0800 Subject: [PATCH 2/4] Use `print(...)` instead of `print ...` since print is a function in Python 3.0 --- kafka/consumer.py | 2 +- scripts/kp_consumer | 2 +- scripts/kp_create_topic | 2 +- scripts/kp_producer | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index d300b51ea..5658de2f2 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -679,4 +679,4 @@ def get_messages(self, count=1, block=True, timeout=10): class ConsoleConsumer(SimpleConsumer): def run(self): for message in self: - print message.message.value + print(message.message.value) diff --git a/scripts/kp_consumer b/scripts/kp_consumer index 13f737e36..ab91e8d41 100755 --- a/scripts/kp_consumer +++ b/scripts/kp_consumer @@ -20,7 +20,7 @@ def main(): consumer.stop() finally: client.close() - print "Done!" + print("Done!") return 0 if __name__ == '__main__': diff --git a/scripts/kp_create_topic b/scripts/kp_create_topic index d505678ee..ec8ff1f32 100755 --- a/scripts/kp_create_topic +++ b/scripts/kp_create_topic @@ -15,7 +15,7 @@ def main(): ensure_topic_creation(client, options.topic) finally: client.close() - print "Done!" + print("Done!") return 0 if __name__ == '__main__': diff --git a/scripts/kp_producer b/scripts/kp_producer index 87f2ce389..55202513e 100755 --- a/scripts/kp_producer +++ b/scripts/kp_producer @@ -18,7 +18,7 @@ def main(): producer.stop() finally: client.close() - print "Done!" + print("Done!") return 0 if __name__ == '__main__': From 9e55ebc790f1a2aad9ed2eae3496ba61341d7e2e Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Tue, 4 Feb 2014 14:36:50 -0800 Subject: [PATCH 3/4] Make ensure_topic_creation take an optional timeout Seems better to use an optional timeout with a default value rather than hard-coding 30 retries, especially since each request can block for a long time then fail. --- kafka/util.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/kafka/util.py b/kafka/util.py index 66d4ba142..16f0a6f19 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,7 +1,7 @@ from collections import defaultdict import struct from threading import Thread, Event -from time import sleep +from time import sleep, time from kafka.common import BufferUnderflowError @@ -116,14 +116,22 @@ def stop(self): self.thread.join(self.t + 1) self.timer = None +DEFAULT_TOPIC_CREATION_TIMEOUT_SECONDS = 30 +def ensure_topic_creation(client, topic, + timeout=DEFAULT_TOPIC_CREATION_TIMEOUT_SECONDS): + if timeout is not None: + max_time = time() + timeout -def ensure_topic_creation(client, topic): - times = 0 - while True: - times += 1 + while timeout is None or timeout > 0: client.load_metadata_for_topics(topic) if client.has_metadata_for_topic(topic): - break - if times > 30: - raise RuntimeError("Unable to create topic %s" % topic) + return + + if timeout is not None: + # If we have a timeout, reduce it to the appropriate value + timeout = max_time - time() + sleep(1) + + raise RuntimeError("Unable to create topic %s" % topic) + From ff47b789ee942dd045f1e510c919150d28a332a1 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Tue, 4 Feb 2014 17:18:40 -0800 Subject: [PATCH 4/4] Handle missing required arguments, move _command_utils to _script_utils --- kafka/_command_utils.py | 25 ------------------------- kafka/_script_utils.py | 35 +++++++++++++++++++++++++++++++++++ scripts/kp_consumer | 5 +++-- scripts/kp_create_topic | 2 +- scripts/kp_producer | 2 +- 5 files changed, 40 insertions(+), 29 deletions(-) delete mode 100644 kafka/_command_utils.py create mode 100644 kafka/_script_utils.py diff --git a/kafka/_command_utils.py b/kafka/_command_utils.py deleted file mode 100644 index 6eb6a0ca5..000000000 --- a/kafka/_command_utils.py +++ /dev/null @@ -1,25 +0,0 @@ -import logging -from optparse import OptionParser, Option, OptionValueError - -from kafka import KafkaClient - -logging.basicConfig() - -BROKER_OPTION = Option("-b", "--broker", dest="broker", - help="Address of a kafka broker") -TOPIC_OPTION = Option("-t", "--topic", dest="topic", - help="The topic to consume from") - -def parse_options(*extra_options): - parser = OptionParser() - parser.add_options([BROKER_OPTION, TOPIC_OPTION] + list(extra_options)) - (opts, args) = parser.parse_args() - return opts - -def get_client(broker, client_id=KafkaClient.CLIENT_ID): - try: - (host, port) = broker.split(':') - except ValueError: - raise OptionValueError("Broker should be in the form 'host:port'") - - return KafkaClient(host, int(port), client_id) diff --git a/kafka/_script_utils.py b/kafka/_script_utils.py new file mode 100644 index 000000000..b784d151b --- /dev/null +++ b/kafka/_script_utils.py @@ -0,0 +1,35 @@ +import logging +from optparse import OptionParser, Option, OptionValueError + +from kafka import KafkaClient + +logging.basicConfig() + +# Add this attribute to easily check if the option is required +Option.ATTRS.append("required") + +BROKER_OPTION = Option("-b", "--broker", dest="broker", required=True, + help="Required: The address of a kafka broker") +TOPIC_OPTION = Option("-t", "--topic", dest="topic", required=True, + help="Required: The topic to consume from") + +def parse_options(*extra_options): + parser = OptionParser() + options = [BROKER_OPTION, TOPIC_OPTION] + list(extra_options) + parser.add_options(options) + (opts, args) = parser.parse_args() + + missing = [o._long_opts[0] for o in options + if o.required and getattr(opts, o.dest) is None] + if missing: + parser.error("Missing required option(s) %s" % ", ".join(missing)) + + return opts + +def get_client(broker, client_id=KafkaClient.CLIENT_ID): + try: + (host, port) = broker.split(':') + except ValueError: + raise OptionValueError("Broker should be in the form 'host:port'") + + return KafkaClient(host, int(port), client_id) diff --git a/scripts/kp_consumer b/scripts/kp_consumer index ab91e8d41..933b058d1 100755 --- a/scripts/kp_consumer +++ b/scripts/kp_consumer @@ -4,9 +4,10 @@ from optparse import Option import sys from kafka import ConsoleConsumer -from kafka._command_utils import parse_options, get_client +from kafka._script_utils import parse_options, get_client -GROUP_OPTION = Option("-g", "--group", dest="group", help="The consumer group") +GROUP_OPTION = Option("-g", "--group", dest="group", required=True, + help="Required: The consumer group") CLIENT_ID = "kp_consumer" def main(): diff --git a/scripts/kp_create_topic b/scripts/kp_create_topic index ec8ff1f32..38cb268f0 100755 --- a/scripts/kp_create_topic +++ b/scripts/kp_create_topic @@ -3,7 +3,7 @@ import sys from kafka.util import ensure_topic_creation -from kafka._command_utils import parse_options, get_client +from kafka._script_utils import parse_options, get_client CLIENT_ID = "kp_create_topic" diff --git a/scripts/kp_producer b/scripts/kp_producer index 55202513e..1cfd8389c 100755 --- a/scripts/kp_producer +++ b/scripts/kp_producer @@ -3,7 +3,7 @@ import sys from kafka import ConsoleProducer -from kafka._command_utils import parse_options, get_client +from kafka._script_utils import parse_options, get_client CLIENT_ID = "kp_producer"