Skip to content

Commit

Permalink
Merge pull request #16 from murthysrd/master
Browse files Browse the repository at this point in the history
Update kafka-python to 2.0.2 to add support for python 3.8
  • Loading branch information
cognifloyd authored Jan 5, 2024
2 parents e8f0708 + 65e9f3d commit 42ec262
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 26 deletions.
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## 2.0.0

* Update to kafka-python 2.0.2 to support python 3.8. #16 by @murthysrd
This is a major version bump as topics will no longer be auto-created
by the sensors or by the produce action even if the Kafka server is
configured to auto-create the topics and partitions.

## 1.0.0

* Drop Python 2.7 support
Expand Down
39 changes: 25 additions & 14 deletions actions/produce.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
"""
Kafka Producer Action for stackstorm
"""
from st2common.runners.base_action import Action
from kafka import SimpleProducer, KafkaClient
from kafka.util import kafka_bytestring
from kafka import KafkaProducer, KafkaConsumer


class ProduceMessageAction(Action):
"""
Action to send messages to Apache Kafka system.
"""
DEFAULT_CLIENT_ID = 'st2-kafka-producer'

DEFAULT_CLIENT_ID = "st2-kafka-producer"

def run(self, topic, message, hosts=None):
"""
Expand All @@ -28,18 +31,26 @@ def run(self, topic, message, hosts=None):

if hosts:
_hosts = hosts
elif self.config.get('hosts', None):
_hosts = self.config['hosts']
elif self.config.get("hosts", None):
_hosts = self.config["hosts"]
else:
raise ValueError("Need to define 'hosts' in either action or in config")

# set default for empty value
_client_id = self.config.get('client_id') or self.DEFAULT_CLIENT_ID

client = KafkaClient(_hosts, client_id=_client_id)
client.ensure_topic_exists(topic)
producer = SimpleProducer(client)
result = producer.send_messages(topic, kafka_bytestring(message))

if result[0]:
return result[0]._asdict()
_client_id = self.config.get("client_id") or self.DEFAULT_CLIENT_ID

consumer = KafkaConsumer(
bootstrap_servers=_hosts.split(","), client_id=_client_id
)
if topic not in consumer.topics():
raise Exception(f"Topic does not exist: {topic}")

producer = KafkaProducer(
bootstrap_servers=_hosts.split(","),
client_id=_client_id,
value_serializer=lambda m: m.encode("utf-8"),
max_request_size=10485760,
)
future = producer.send(topic, message)
record_metadata = future.get(timeout=10) # TODO: Make this timeout an input param
return record_metadata._asdict()
2 changes: 1 addition & 1 deletion pack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ keywords:
- base64
- stackdriver
- google cloud
version: 1.0.0
version: 2.0.0
author: StackStorm
email: [email protected]
Contributors:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
kafka-python>=0.9.4,<1.0
kafka-python==2.0.2
14 changes: 9 additions & 5 deletions sensors/gcp_message_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,16 @@ def _ensure_topics_existence(self):
"""
Ensure that topics we're listening to exist.
Fetching metadata for a non-existent topic will automatically try to create it
with the default replication factor and number of partitions (default server config).
Otherwise Kafka server is not configured to auto-create topics and partitions.
This does not fetch metadata for specific topics, so it will not trigger auto-creation
of topics and partitions even if the Kafka server has the default server config that
allows such auto-creation.
"""
map(self._consumer._client.ensure_topic_exists, self._topics)
self._consumer.set_topic_partitions(*self._topics)
cluster_topics = self._consumer.topics()
missing_topics = [topic for topic in self._topics if topic not in cluster_topics]
if missing_topics:
raise Exception(f"One or more topics do not exist: {', '.join(missing_topics)}")
# topic partitions were already subscribed on KafkaConsumer init
# self._consumer.subscribe(topics=self._topics)

def run(self):
"""
Expand Down
14 changes: 9 additions & 5 deletions sensors/message_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,16 @@ def _ensure_topics_existence(self):
"""
Ensure that topics we're listening to exist.
Fetching metadata for a non-existent topic will automatically try to create it
with the default replication factor and number of partitions (default server config).
Otherwise Kafka server is not configured to auto-create topics and partitions.
This does not fetch metadata for specific topics, so it will not trigger auto-creation
of topics and partitions even if the Kafka server has the default server config that
allows such auto-creation.
"""
map(self._consumer._client.ensure_topic_exists, self._topics)
self._consumer.set_topic_partitions(*self._topics)
cluster_topics = self._consumer.topics()
missing_topics = [topic for topic in self._topics if topic not in cluster_topics]
if missing_topics:
raise Exception(f"One or more topics do not exist: {', '.join(missing_topics)}")
# topic partitions were already subscribed on KafkaConsumer init
# self._consumer.subscribe(topics=self._topics)

def run(self):
"""
Expand Down

0 comments on commit 42ec262

Please sign in to comment.