-
Notifications
You must be signed in to change notification settings - Fork 214
/
Copy pathbasic_discovery.py
110 lines (86 loc) · 4.48 KB
/
basic_discovery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.
import time
import json
from awscrt import io, http
from awscrt.mqtt import QoS
from awsiot.greengrass_discovery import DiscoveryClient
from awsiot import mqtt_connection_builder
from utils.command_line_utils import CommandLineUtils
allowed_actions = ['both', 'publish', 'subscribe']
# cmdData is the arguments/input from the command line placed into a single struct for
# use in this sample. This handles all of the command line parsing, validating, etc.
# See the Utils/CommandLineUtils for more information.
cmdData = CommandLineUtils.parse_sample_input_basic_discovery()
tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(cmdData.input_cert, cmdData.input_key)
if (cmdData.input_ca is not None):
tls_options.override_default_trust_store_from_path(None, cmdData.input_ca)
tls_context = io.ClientTlsContext(tls_options)
socket_options = io.SocketOptions()
proxy_options = None
if cmdData.input_proxy_host is not None and cmdData.input_proxy_port != 0:
proxy_options = http.HttpProxyOptions(cmdData.input_proxy_host, cmdData.input_proxy_port)
print('Performing greengrass discovery...')
discovery_client = DiscoveryClient(
io.ClientBootstrap.get_or_create_static_default(),
socket_options,
tls_context,
cmdData.input_signing_region, None, proxy_options)
resp_future = discovery_client.discover(cmdData.input_thing_name)
discover_response = resp_future.result()
if (cmdData.input_is_ci):
print("Received a greengrass discovery result! Not showing result in CI for possible data sensitivity.")
else:
print(discover_response)
if (cmdData.input_print_discovery_resp_only):
exit(0)
def on_connection_interupted(connection, error, **kwargs):
print('connection interrupted with error {}'.format(error))
def on_connection_resumed(connection, return_code, session_present, **kwargs):
print('connection resumed with return code {}, session present {}'.format(return_code, session_present))
# Try IoT endpoints until we find one that works
def try_iot_endpoints():
for gg_group in discover_response.gg_groups:
for gg_core in gg_group.cores:
for connectivity_info in gg_core.connectivity:
try:
print(
f"Trying core {gg_core.thing_arn} at host {connectivity_info.host_address} port {connectivity_info.port}")
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=connectivity_info.host_address,
port=connectivity_info.port,
cert_filepath=cmdData.input_cert,
pri_key_filepath=cmdData.input_key,
ca_bytes=gg_group.certificate_authorities[0].encode('utf-8'),
on_connection_interrupted=on_connection_interupted,
on_connection_resumed=on_connection_resumed,
client_id=cmdData.input_thing_name,
clean_session=False,
keep_alive_secs=30)
connect_future = mqtt_connection.connect()
connect_future.result()
print('Connected!')
return mqtt_connection
except Exception as e:
print('Connection failed with exception {}'.format(e))
continue
exit('All connection attempts failed')
mqtt_connection = try_iot_endpoints()
if cmdData.input_mode == 'both' or cmdData.input_mode == 'subscribe':
def on_publish(topic, payload, dup, qos, retain, **kwargs):
print('Publish received on topic {}'.format(topic))
print(payload)
subscribe_future, _ = mqtt_connection.subscribe(cmdData.input_topic, QoS.AT_MOST_ONCE, on_publish)
subscribe_result = subscribe_future.result()
loop_count = 0
while loop_count < cmdData.input_max_pub_ops:
if cmdData.input_mode == 'both' or cmdData.input_mode == 'publish':
message = {}
message['message'] = cmdData.input_message
message['sequence'] = loop_count
messageJson = json.dumps(message)
pub_future, _ = mqtt_connection.publish(cmdData.input_topic, messageJson, QoS.AT_LEAST_ONCE)
publish_completion_data = pub_future.result()
print('Published topic {}: {} (puback reason: {})\n'.format(cmdData.input_topic, messageJson, repr(publish_completion_data.puback.reason_code)))
loop_count += 1
time.sleep(1)