Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Console commands #119

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from kafka.producer import SimpleProducer

def produce_example(client):
producer = SimpleProducer(client, "my-topic")
producer.send_messages("test")
producer = SimpleProducer(client)
producer.send_messages("my-topic", "test")

def consume_example(client):
consumer = SimpleConsumer(client, "test-group", "my-topic")
Expand Down
12 changes: 6 additions & 6 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message
)
from kafka.producer import SimpleProducer, KeyedProducer
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner
from kafka.consumer import SimpleConsumer, MultiProcessConsumer
from kafka.producer import ConsoleProducer, KeyedProducer, SimpleProducer
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
from kafka.consumer import ConsoleConsumer, MultiProcessConsumer, SimpleConsumer

__all__ = [
'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer',
'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer',
'MultiProcessConsumer', 'create_message', 'create_gzip_message',
'create_snappy_message'
'ConsoleProducer', 'RoundRobinPartitioner', 'HashedPartitioner',
'ConsoleConsumer', 'SimpleConsumer', 'MultiProcessConsumer',
'create_message', 'create_gzip_message', 'create_snappy_message'
]
35 changes: 35 additions & 0 deletions kafka/_script_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging
from optparse import OptionParser, Option, OptionValueError
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optparse is marked as deprecated, use argparse instead.

Deprecated since version 2.7: The optparse module is deprecated and will not be developed further; development will continue with the argparse module.

I believe it will be a drop in replacement for the changes you are making.


from kafka import KafkaClient

logging.basicConfig()

# Add this attribute to easily check if the option is required
Option.ATTRS.append("required")

BROKER_OPTION = Option("-b", "--broker", dest="broker", required=True,
help="Required: The address of a kafka broker")
TOPIC_OPTION = Option("-t", "--topic", dest="topic", required=True,
help="Required: The topic to consume from")

def parse_options(*extra_options):
parser = OptionParser()
options = [BROKER_OPTION, TOPIC_OPTION] + list(extra_options)
parser.add_options(options)
(opts, args) = parser.parse_args()

missing = [o._long_opts[0] for o in options
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have to check for missing required options this if options are already marked as required? I haven't worked with optparse but I am pretty sure argparse does this for free.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the 'required' attr in line 9. It's not handled by optparse. I thought argparse does not support python 2.6, but now I see that there is a package for it, so I'll change this.

if o.required and getattr(opts, o.dest) is None]
if missing:
parser.error("Missing required option(s) %s" % ", ".join(missing))

return opts

def get_client(broker, client_id=KafkaClient.CLIENT_ID):
try:
(host, port) = broker.split(':')
except ValueError:
raise OptionValueError("Broker should be in the form 'host:port'")

return KafkaClient(host, int(port), client_id)
6 changes: 6 additions & 0 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,9 @@ def get_messages(self, count=1, block=True, timeout=10):
self._auto_commit()

return messages


class ConsoleConsumer(SimpleConsumer):
def run(self):
for message in self:
print(message.message.value)
10 changes: 10 additions & 0 deletions kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,13 @@ def send(self, topic, key, msg):

def __repr__(self):
return '<KeyedProducer batch=%s>' % self.async


class ConsoleProducer(SimpleProducer):
def run(self, topic):
import readline
while True:
try:
self.send_messages(topic, raw_input())
except EOFError:
break
21 changes: 21 additions & 0 deletions kafka/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import defaultdict
import struct
from threading import Thread, Event
from time import sleep, time

from kafka.common import BufferUnderflowError

Expand Down Expand Up @@ -114,3 +115,23 @@ def stop(self):
self.active.set()
self.thread.join(self.t + 1)
self.timer = None

DEFAULT_TOPIC_CREATION_TIMEOUT_SECONDS = 30
def ensure_topic_creation(client, topic,
timeout=DEFAULT_TOPIC_CREATION_TIMEOUT_SECONDS):
if timeout is not None:
max_time = time() + timeout

while timeout is None or timeout > 0:
client.load_metadata_for_topics(topic)
if client.has_metadata_for_topic(topic):
return

if timeout is not None:
# If we have a timeout, reduce it to the appropriate value
timeout = max_time - time()

sleep(1)

raise RuntimeError("Unable to create topic %s" % topic)

28 changes: 28 additions & 0 deletions scripts/kp_consumer
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env python

from optparse import Option
import sys

from kafka import ConsoleConsumer
from kafka._script_utils import parse_options, get_client

GROUP_OPTION = Option("-g", "--group", dest="group", required=True,
help="Required: The consumer group")
CLIENT_ID = "kp_consumer"

def main():
options = parse_options(GROUP_OPTION)
client = get_client(options.broker, CLIENT_ID)
consumer = ConsoleConsumer(client, options.group, options.topic,
auto_commit=False)
try:
consumer.run()
except KeyboardInterrupt:
consumer.stop()
finally:
client.close()
print("Done!")
return 0

if __name__ == '__main__':
sys.exit(main())
22 changes: 22 additions & 0 deletions scripts/kp_create_topic
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env python

import sys

from kafka.util import ensure_topic_creation
from kafka._script_utils import parse_options, get_client

CLIENT_ID = "kp_create_topic"

def main():
options = parse_options()
client = get_client(options.broker, CLIENT_ID)
try:
print "Creating topic %s..." % options.topic
ensure_topic_creation(client, options.topic)
finally:
client.close()
print("Done!")
return 0

if __name__ == '__main__':
sys.exit(main())
25 changes: 25 additions & 0 deletions scripts/kp_producer
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env python

import sys

from kafka import ConsoleProducer
from kafka._script_utils import parse_options, get_client

CLIENT_ID = "kp_producer"

def main():
options = parse_options()
client = get_client(options.broker, CLIENT_ID)
producer = ConsoleProducer(client)
try:
producer.run(options.topic)
producer.stop()
except KeyboardInterrupt:
producer.stop()
finally:
client.close()
print("Done!")
return 0

if __name__ == '__main__':
sys.exit(main())
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def run(self):
cmdclass={"test": Tox},

packages=["kafka"],
scripts=["scripts/kp_consumer", "scripts/kp_producer", "scripts/kp_create_topic"],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you thought about using console_scripts entry points instead? I am (unfortunately) working on Windows during most of my work hours and scripts like these are not portable on this platform. A bat/cmd file can be added to scripts to compensate but console scripts are more portable since they create executables for us.

I probably won't be using these scripts so I don't mind all that much but it may be useful to others.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not know about console_scripts. I'll make this change


author="David Arthur",
author_email="[email protected]",
Expand Down
15 changes: 1 addition & 14 deletions test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from kafka.common import * # noqa
from kafka.codec import has_gzip, has_snappy
from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.util import ensure_topic_creation
from .fixtures import ZookeeperFixture, KafkaFixture


Expand All @@ -17,20 +18,6 @@ def random_string(l):
return s


def ensure_topic_creation(client, topic_name):
times = 0
while True:
times += 1
client.load_metadata_for_topics(topic_name)
if client.has_metadata_for_topic(topic_name):
break
print "Waiting for %s topic to be created" % topic_name
time.sleep(1)

if times > 30:
raise Exception("Unable to create topic %s" % topic_name)


class KafkaTestCase(unittest.TestCase):
def setUp(self):
self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10))
Expand Down