Skip to content

Commit

Permalink
Merge pull request #76 from adafruit/mqtt-group-get
Browse files Browse the repository at this point in the history
Adding Group Features to mqtt_client.py
  • Loading branch information
brentru authored Sep 5, 2018
2 parents da00252 + 9c5399c commit 7de8fd6
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Adafruit_IO/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2.0.16"
__version__ = "2.0.17"
65 changes: 42 additions & 23 deletions Adafruit_IO/mqtt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,24 @@ def _mqtt_disconnect(self, client, userdata, rc):

def _mqtt_message(self, client, userdata, msg):
logger.debug('Client on_message called.')
# Parse out the feed id and call on_message callback.
# Assumes topic looks like "username/feeds/id"
"""Parse out the topic and call on_message callback
assume topic looks like `username/topic/id`
"""
parsed_topic = msg.topic.split('/')
if self.on_message is not None:
feed = parsed_topic[2]
topic = parsed_topic[2]
payload = '' if msg.payload is None else msg.payload.decode('utf-8')
elif self.on_message is not None and parsed_topic[0] == 'time':
feed = parsed_topic[0]
topic = parsed_topic[0]
payload = msg.payload.decode('utf-8')
elif self.on_message is not None and parsed_topic[1] == 'groups':
topic = parsed_topic[3]
payload = msg.payload.decode('utf-8')
self.on_message(self, feed, payload)
self.on_message(self, topic, payload)

def _mqtt_subscribe(client, userdata, mid, granted_qos):
"""Called when broker responds to a subscribe request."""


def connect(self, **kwargs):
"""Connect to the Adafruit.IO service. Must be called before any loop
or publish operations are called. Will raise an exception if a
Expand Down Expand Up @@ -179,15 +182,21 @@ def subscribe(self, feed_id, feed_user=None):
the on_message function will be called with the feed_id and new value.
Params:
- feed_id: The id of the feed to update.
- feed_user (optional): The user id of the feed. Used for feed sharing.
- feed_id: The id of the feed to subscribe to.
- feed_user (optional): The user id of the feed. Used for feed sharing functionality.
"""
if feed_user is not None:
(res, mid) = self._client.subscribe('{0}/feeds/{1}'.format(feed_user, feed_id))
else:
(res, mid) = self._client.subscribe('{0}/feeds/{1}'.format(self._username, feed_id))
return res, mid

def subscribe_group(self, group_id):
"""Subscribe to changes on the specified group. When the group is updated
the on_message function will be called with the group_id and the new value.
"""
self._client.subscribe('{0}/groups/{1}'.format(self._username, group_id))

def subscribe_time(self, time):
"""Subscribe to changes on the Adafruit IO time feeds. When the feed is
updated, the on_message function will be called and publish a new value:
Expand All @@ -204,24 +213,34 @@ def subscribe_time(self, time):
raise TypeError('Invalid Time Feed Specified.')
return

def unsubscribe(self, feed_id):
"""Unsubscribes from a specified MQTT feed.
Note: this does not prevent publishing to a feed, it will unsubscribe
from receiving messages via on_message.
"""
(res, mid) = self._client.unsubscribe('{0}/feeds/{1}'.format(self._username, feed_id))

def publish(self, feed_id, value=None, feed_user=None):
def unsubscribe(self, feed_id=None, group_id=None):
"""Unsubscribes from a specified MQTT topic.
Note: this does not prevent publishing to a topic, it will unsubscribe
from receiving messages via on_message.
"""
if feed_id is not None:
self._client.unsubscribe('{0}/feeds/{1}'.format(self._username, feed_id))
elif group_id is not None:
self._client.unsubscribe('{0}/groups/{1}'.format(self._username, group_id))
else:
raise TypeError('Invalid topic type specified.')
return

def publish(self, feed_id, value=None, group_id=None, feed_user=None):
"""Publish a value to a specified feed.
Params:
- feed_id: The id of the feed to update.
- feed_user (optional): The user id of the feed. Used for feed sharing.
- value: The new value to publish to the feed.
- (optional) group_id: The id of the group to update.
- (optional) feed_user: The feed owner's username. Used for Sharing Feeds.
"""
if feed_user is not None:
(res, self._pub_mid) = self._client.publish('{0}/feeds/{1}'.format(feed_user, feed_id),
payload=value)
else:
(res, self._pub_mid) = self._client.publish('{0}/feeds/{1}'.format(self._username, feed_id),
payload=value)
if feed_user is not None: # shared feed
(res, self._pub_mid) = self._client.publish('{0}/feeds/{1}'.format(feed_user, feed_id),
payload=value)
elif group_id is not None: # group-specified feed
self._client.publish('{0}/feeds/{1}.{2}'.format(self._username, group_id, feed_id),
payload=value)
else: # regular feed
(res, self._pub_mid) = self._client.publish('{0}/feeds/{1}'.format(self._username, feed_id),
payload=value)
80 changes: 80 additions & 0 deletions examples/mqtt/mqtt_groups_pusbsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Example of subscribing to an Adafruit IO group
# and publishing to the feeds within it

# Author: Brent Rubell for Adafruit Industries, 2018

# Import standard python modules.
import random
import sys
import time

# Import Adafruit IO MQTT client.
from Adafruit_IO import MQTTClient

# Set to your Adafruit IO key.
# Remember, your key is a secret,
# so make sure not to publish it when you publish this code!
ADAFRUIT_IO_KEY = 'YOUR_AIO_KEY'

# Set to your Adafruit IO username.
# (go to https://accounts.adafruit.com to find your username)
ADAFRUIT_IO_USERNAME = 'YOUR_AIO_USERNAME'

# Group Name
group_name = 'grouptest'

# Feeds within the group
group_feed_one = 'one'
group_feed_two = 'two'

# Define callback functions which will be called when certain events happen.
def connected(client):
# Connected function will be called when the client is connected to Adafruit IO.
# This is a good place to subscribe to topic changes. The client parameter
# passed to this function is the Adafruit IO MQTT client so you can make
# calls against it easily.
print('Listening for changes on ', group_name)
# Subscribe to changes on a group, `group_name`
client.subscribe_group(group_name)

def disconnected(client):
# Disconnected function will be called when the client disconnects.
print('Disconnected from Adafruit IO!')
sys.exit(1)

def message(client, topic_id, payload):
# Message function will be called when a subscribed topic has a new value.
# The topic_id parameter identifies the topic, and the payload parameter has
# the new value.
print('Topic {0} received new value: {1}'.format(topic_id, payload))


# Create an MQTT client instance.
client = MQTTClient(ADAFRUIT_IO_USERNAME, ADAFRUIT_IO_KEY)

# Setup the callback functions defined above.
client.on_connect = connected
client.on_disconnect = disconnected
client.on_message = message

# Connect to the Adafruit IO server.
client.connect()

# Now the program needs to use a client loop function to ensure messages are
# sent and received. There are a few options for driving the message loop,
# depending on what your program needs to do.

# The first option is to run a thread in the background so you can continue
# doing things in your program.
client.loop_background()
# Now send new values every 5 seconds.
print('Publishing a new message every 5 seconds (press Ctrl-C to quit)...')
while True:
value = random.randint(0, 100)
print('Publishing {0} to {1}.{2}.'.format(value, group_name, group_feed_one))
client.publish('one', value, group_name)

value = random.randint(0,100)
print('Publishing {0} to {1}.{2}.'.format(value, group_name, group_feed_two))
client.publish('two', value, group_name)
time.sleep(5)

0 comments on commit 7de8fd6

Please sign in to comment.