forked from aws/aws-iot-device-sdk-python-v2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshadow.py
344 lines (289 loc) · 13.9 KB
/
shadow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.
import argparse
from awscrt import auth, io, mqtt, http
from awsiot import iotshadow
from awsiot import mqtt_connection_builder
from concurrent.futures import Future
import sys
import threading
import traceback
from uuid import uuid4
# - Overview -
# This sample uses the AWS IoT Device Shadow Service to keep a property in
# sync between device and server. Imagine a light whose color may be changed
# through an app, or set by a local user.
#
# - Instructions -
# Once connected, type a value in the terminal and press Enter to update
# the property's "reported" value. The sample also responds when the "desired"
# value changes on the server. To observe this, edit the Shadow document in
# the AWS Console and set a new "desired" value.
#
# - Detail -
# On startup, the sample requests the shadow document to learn the property's
# initial state. The sample also subscribes to "delta" events from the server,
# which are sent when a property's "desired" value differs from its "reported"
# value. When the sample learns of a new desired value, that value is changed
# on the device and an update is sent to the server with the new "reported"
# value.
parser = argparse.ArgumentParser(description="Device Shadow sample keeps a property in sync across client and server")
parser.add_argument('--endpoint', required=True, help="Your AWS IoT custom endpoint, not including a port. " +
"Ex: \"w6zbse3vjd5b4p-ats.iot.us-west-2.amazonaws.com\"")
parser.add_argument('--cert', help="File path to your client certificate, in PEM format")
parser.add_argument('--key', help="File path to your private key file, in PEM format")
parser.add_argument('--root-ca', help="File path to root certificate authority, in PEM format. " +
"Necessary if MQTT server uses a certificate that's not already in " +
"your trust store")
parser.add_argument('--client-id', default="test-" + str(uuid4()), help="Client ID for MQTT connection.")
parser.add_argument('--thing-name', required=True, help="The name assigned to your IoT Thing")
parser.add_argument('--shadow-property', default="color", help="Name of property in shadow to keep in sync")
parser.add_argument('--use-websocket', default=False, action='store_true',
help="To use a websocket instead of raw mqtt. If you " +
"specify this option you must specify a region for signing, you can also enable proxy mode.")
parser.add_argument('--signing-region', default='us-east-1', help="If you specify --use-web-socket, this " +
"is the region that will be used for computing the Sigv4 signature")
parser.add_argument('--proxy-host', help="Hostname for proxy to connect to. Note: if you use this feature, " +
"you will likely need to set --root-ca to the ca for your proxy.")
parser.add_argument('--proxy-port', type=int, default=8080, help="Port for proxy to connect to.")
parser.add_argument('--verbosity', choices=[x.name for x in io.LogLevel], default=io.LogLevel.NoLogs.name,
help='Logging level')
# Using globals to simplify sample code
is_sample_done = threading.Event()
mqtt_connection = None
shadow_client = None
thing_name = ""
shadow_property = ""
SHADOW_VALUE_DEFAULT = "off"
class LockedData:
def __init__(self):
self.lock = threading.Lock()
self.shadow_value = None
self.disconnect_called = False
locked_data = LockedData()
# Function for gracefully quitting this sample
def exit(msg_or_exception):
if isinstance(msg_or_exception, Exception):
print("Exiting sample due to exception.")
traceback.print_exception(msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2])
else:
print("Exiting sample:", msg_or_exception)
with locked_data.lock:
if not locked_data.disconnect_called:
print("Disconnecting...")
locked_data.disconnect_called = True
future = mqtt_connection.disconnect()
future.add_done_callback(on_disconnected)
def on_disconnected(disconnect_future):
# type: (Future) -> None
print("Disconnected.")
# Signal that sample is finished
is_sample_done.set()
def on_get_shadow_accepted(response):
# type: (iotshadow.GetShadowResponse) -> None
try:
print("Finished getting initial shadow state.")
with locked_data.lock:
if locked_data.shadow_value is not None:
print(" Ignoring initial query because a delta event has already been received.")
return
if response.state:
if response.state.delta:
value = response.state.delta.get(shadow_property)
if value:
print(" Shadow contains delta value '{}'.".format(value))
change_shadow_value(value)
return
if response.state.reported:
value = response.state.reported.get(shadow_property)
if value:
print(" Shadow contains reported value '{}'.".format(value))
set_local_value_due_to_initial_query(response.state.reported[shadow_property])
return
print(" Shadow document lacks '{}' property. Setting defaults...".format(shadow_property))
change_shadow_value(SHADOW_VALUE_DEFAULT)
return
except Exception as e:
exit(e)
def on_get_shadow_rejected(error):
# type: (iotshadow.ErrorResponse) -> None
if error.code == 404:
print("Thing has no shadow document. Creating with defaults...")
change_shadow_value(SHADOW_VALUE_DEFAULT)
else:
exit("Get request was rejected. code:{} message:'{}'".format(
error.code, error.message))
def on_shadow_delta_updated(delta):
# type: (iotshadow.ShadowDeltaUpdatedEvent) -> None
try:
print("Received shadow delta event.")
if delta.state and (shadow_property in delta.state):
value = delta.state[shadow_property]
if value is None:
print(" Delta reports that '{}' was deleted. Resetting defaults...".format(shadow_property))
change_shadow_value(SHADOW_VALUE_DEFAULT)
return
else:
print(" Delta reports that desired value is '{}'. Changing local value...".format(value))
change_shadow_value(value)
else:
print(" Delta did not report a change in '{}'".format(shadow_property))
except Exception as e:
exit(e)
def on_publish_update_shadow(future):
#type: (Future) -> None
try:
future.result()
print("Update request published.")
except Exception as e:
print("Failed to publish update request.")
exit(e)
def on_update_shadow_accepted(response):
# type: (iotshadow.UpdateShadowResponse) -> None
try:
print("Finished updating reported shadow value to '{}'.".format(response.state.reported[shadow_property])) # type: ignore
print("Enter desired value: ") # remind user they can input new values
except:
exit("Updated shadow is missing the target property.")
def on_update_shadow_rejected(error):
# type: (iotshadow.ErrorResponse) -> None
exit("Update request was rejected. code:{} message:'{}'".format(
error.code, error.message))
def set_local_value_due_to_initial_query(reported_value):
with locked_data.lock:
locked_data.shadow_value = reported_value
print("Enter desired value: ") # remind user they can input new values
def change_shadow_value(value):
with locked_data.lock:
if locked_data.shadow_value == value:
print("Local value is already '{}'.".format(value))
print("Enter desired value: ") # remind user they can input new values
return
print("Changed local shadow value to '{}'.".format(value))
locked_data.shadow_value = value
print("Updating reported shadow value to '{}'...".format(value))
request = iotshadow.UpdateShadowRequest(
thing_name=thing_name,
state=iotshadow.ShadowState(
reported={ shadow_property: value },
desired={ shadow_property: value },
)
)
future = shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE)
future.add_done_callback(on_publish_update_shadow)
def user_input_thread_fn():
while True:
try:
# Read user input
new_value = input()
# If user wants to quit sample, then quit.
# Otherwise change the shadow value.
if new_value in ['exit', 'quit']:
exit("User has quit")
break
else:
change_shadow_value(new_value)
except Exception as e:
print("Exception on input thread.")
exit(e)
break
if __name__ == '__main__':
# Process input args
args = parser.parse_args()
thing_name = args.thing_name
shadow_property = args.shadow_property
io.init_logging(getattr(io.LogLevel, args.verbosity), 'stderr')
# Spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
if args.use_websocket == True:
proxy_options = None
if (args.proxy_host):
proxy_options = http.HttpProxyOptions(host_name=args.proxy_host, port=args.proxy_port)
credentials_provider = auth.AwsCredentialsProvider.new_default_chain(client_bootstrap)
mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing(
endpoint=args.endpoint,
client_bootstrap=client_bootstrap,
region=args.signing_region,
credentials_provider=credentials_provider,
websocket_proxy_options=proxy_options,
ca_filepath=args.root_ca,
client_id=args.client_id,
clean_session=False,
keep_alive_secs=6)
else:
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=args.endpoint,
cert_filepath=args.cert,
pri_key_filepath=args.key,
client_bootstrap=client_bootstrap,
ca_filepath=args.root_ca,
client_id=args.client_id,
clean_session=False,
keep_alive_secs=6)
print("Connecting to {} with client ID '{}'...".format(
args.endpoint, args.client_id))
connected_future = mqtt_connection.connect()
shadow_client = iotshadow.IotShadowClient(mqtt_connection)
# Wait for connection to be fully established.
# Note that it's not necessary to wait, commands issued to the
# mqtt_connection before its fully connected will simply be queued.
# But this sample waits here so it's obvious when a connection
# fails or succeeds.
connected_future.result()
print("Connected!")
try:
# Subscribe to necessary topics.
# Note that is **is** important to wait for "accepted/rejected" subscriptions
# to succeed before publishing the corresponding "request".
print("Subscribing to Delta events...")
delta_subscribed_future, _ = shadow_client.subscribe_to_shadow_delta_updated_events(
request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name=args.thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_shadow_delta_updated)
# Wait for subscription to succeed
delta_subscribed_future.result()
print("Subscribing to Update responses...")
update_accepted_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_accepted(
request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=args.thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_update_shadow_accepted)
update_rejected_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_rejected(
request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=args.thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_update_shadow_rejected)
# Wait for subscriptions to succeed
update_accepted_subscribed_future.result()
update_rejected_subscribed_future.result()
print("Subscribing to Get responses...")
get_accepted_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_accepted(
request=iotshadow.GetShadowSubscriptionRequest(thing_name=args.thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_get_shadow_accepted)
get_rejected_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_rejected(
request=iotshadow.GetShadowSubscriptionRequest(thing_name=args.thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_get_shadow_rejected)
# Wait for subscriptions to succeed
get_accepted_subscribed_future.result()
get_rejected_subscribed_future.result()
# The rest of the sample runs asyncronously.
# Issue request for shadow's current state.
# The response will be received by the on_get_accepted() callback
print("Requesting current shadow state...")
publish_get_future = shadow_client.publish_get_shadow(
request=iotshadow.GetShadowRequest(thing_name=args.thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE)
# Ensure that publish succeeds
publish_get_future.result()
# Launch thread to handle user input.
# A "daemon" thread won't prevent the program from shutting down.
print("Launching thread to read user input...")
user_input_thread = threading.Thread(target=user_input_thread_fn, name='user_input_thread')
user_input_thread.daemon = True
user_input_thread.start()
except Exception as e:
exit(e)
# Wait for the sample to finish (user types 'quit', or an error occurs)
is_sample_done.wait()