1
- """ mqttgpio - a shim between MQTT and GPIO on raspberry Pi """
1
+ """mqttgpio - a shim between MQTT and GPIO on raspberry Pi"""
2
2
3
+ from configparser import ConfigParser
3
4
import json
4
5
import logging
5
6
6
- import gpiozero # type: ignore
7
- import paho .mqtt .client as mqtt # type: ignore
8
-
9
- class GPIOSwitch (): #pylint: disable=too-many-instance-attributes
10
- """ a single pin controller """
11
- def __init__ (self , name : str ,
12
- pin : int ,
13
- client : mqtt .Client ,
14
- qos : int ,
15
- logging_object : logging .Logger ,
16
- initial_state : bool = False ,
17
- mock_pins : bool = False ,
18
- ): #pylint: disable=too-many-arguments
7
+ import gpiozero # type: ignore
8
+ import paho .mqtt .client as mqtt
9
+ from paho .mqtt .client import MQTTMessageInfo
10
+
11
+ CONFIG_FILES = ["/etc/mqttgpio.conf" , "./mqttgpio.conf" , "/opt/mqttgpio/mqttgpio.conf" ]
12
+
13
+ LOG_LEVELS = {
14
+ "info" : logging .INFO ,
15
+ "debug" : logging .DEBUG ,
16
+ "warning" : logging .WARNING ,
17
+ "error" : logging .ERROR ,
18
+ }
19
+
20
+
21
+ def load_config (log_object : logging .Logger ) -> ConfigParser :
22
+ config = ConfigParser ()
23
+ parsed_files = config .read (CONFIG_FILES )
24
+
25
+ LOG_LEVEL = config .get ("Default" , "logging" , fallback = "info" )
26
+
27
+ if LOG_LEVEL .lower () in LOG_LEVELS :
28
+ log_object .setLevel (LOG_LEVELS [LOG_LEVEL .lower ()])
29
+ else :
30
+ log_object .setLevel (logging .DEBUG )
31
+ log_object .debug (
32
+ "Configuration file had a misconfigured 'logging' setting (%s) - setting to DEBUG" ,
33
+ LOG_LEVEL ,
34
+ ) # pylint: disable=line-too-long
35
+
36
+ log_object .info ("Loaded configuration from: %s" , "," .join (parsed_files ))
37
+
38
+ return config
39
+
40
+
41
+ class GPIOSwitch :
42
+ """a single pin controller"""
43
+
44
+ def __init__ (
45
+ self ,
46
+ name : str ,
47
+ pin : int ,
48
+ client : mqtt .Client ,
49
+ qos : int ,
50
+ logging_object : logging .Logger ,
51
+ initial_state : bool = False ,
52
+ mock_pins : bool = False ,
53
+ ) -> None :
19
54
self .name = name
20
- self .device_class = ' switch'
55
+ self .device_class = " switch"
21
56
self .client = client
22
57
self .mqtt_qos = qos
23
58
self .logger = logging_object
24
59
self .mock_pins = mock_pins
25
60
if mock_pins :
26
61
self .pin_io = gpiozero .Device .pin_factory .pin (pin )
27
62
else :
28
- self .pin_io = gpiozero .LED (pin ) # pylint: disable=undefined-variable
63
+ self .pin_io = gpiozero .LED (pin ) # pylint: disable=undefined-variable
29
64
30
65
# might as well say hello on startup
31
66
self .announce_config ()
32
67
self ._set_state (initial_state )
33
68
34
- def str_state (self ):
35
- """ returns the state in the home assistant version """
69
+ def str_state (self ) -> str :
70
+ """returns the state in the home assistant version"""
36
71
if self .state :
37
72
return "ON"
38
73
return "OFF"
39
74
40
- def config_topic (self ):
41
- """ returns the config topic """
75
+ def config_topic (self ) -> str :
76
+ """returns the config topic"""
42
77
return f"homeassistant/{ self .device_class } /{ self .name } /config"
43
78
44
- def state_topic (self ):
45
- """ returns the state topic as a string """
79
+ def state_topic (self ) -> str :
80
+ """returns the state topic as a string"""
46
81
return f"{ self .name } /state"
47
82
48
- def command_topic (self ):
49
- """ returns the command topic as a string """
83
+ def command_topic (self ) -> str :
84
+ """returns the command topic as a string"""
50
85
return f"{ self .name } /cmnd"
51
86
52
- def _publish (self , topic , payload ) :
53
- """ publishes a message """
54
- return self .client .publish (topic ,
55
- payload ,
56
- qos = self . mqtt_qos ,
57
- )
58
-
87
+ def _publish (self , topic : str , payload : str ) -> MQTTMessageInfo :
88
+ """publishes a message"""
89
+ return self .client .publish (
90
+ topic ,
91
+ payload ,
92
+ qos = self . mqtt_qos ,
93
+ )
59
94
60
- def announce_config (self ):
61
- """ sends the MQTT message to configure
62
- home assistant
95
+ def announce_config (self ) -> None :
96
+ """sends the MQTT message to configure
97
+ home assistant
63
98
"""
64
99
payload = {
65
- ' name' : self .name ,
66
- ' state_topic' : self .state_topic (),
67
- ' command_topic' : self .command_topic (),
68
- "val_tpl" : ' {{value_json.POWER}}' ,
100
+ " name" : self .name ,
101
+ " state_topic" : self .state_topic (),
102
+ " command_topic" : self .command_topic (),
103
+ "val_tpl" : " {{value_json.POWER}}" ,
69
104
}
70
105
self .logger .debug (
71
106
"%s.announce_config(%s)" ,
72
107
self .name ,
73
108
payload ,
74
- )
109
+ )
75
110
self ._publish (
76
111
self .config_topic (),
77
112
payload = json .dumps (payload ),
78
- )
113
+ )
79
114
80
- def announce_state (self ):
81
- """ sends the MQTT message about the current state """
82
- payload = {' POWER' : self .str_state ()}
115
+ def announce_state (self ) -> None :
116
+ """sends the MQTT message about the current state"""
117
+ payload = {" POWER" : self .str_state ()}
83
118
84
119
self .logger .debug ("%s.announce_state(%s)" , self .name , payload )
85
120
self ._publish (self .state_topic (), payload = json .dumps (payload ))
86
121
87
- def _set_state (self , state ) :
88
- """ Does a few things:
89
- - sets the internal state variable
90
- - sets the GPIO
91
- - announces via MQTT the current state
122
+ def _set_state (self , state : bool ) -> None :
123
+ """Does a few things:
124
+ - sets the internal state variable
125
+ - sets the GPIO
126
+ - announces via MQTT the current state
92
127
"""
93
128
if self .mock_pins :
94
129
if state :
@@ -105,12 +140,16 @@ def _set_state(self, state):
105
140
self .state = state
106
141
self .announce_state ()
107
142
108
- def handle_command (self , payload ) :
109
- """ takes actions based on incoming commands """
143
+ def handle_command (self , payload : bytes ) -> None :
144
+ """takes actions based on incoming commands"""
110
145
self .logger .debug ("%s.handle_command(%s)" , self .name , payload )
111
- if payload == b'ON' :
146
+ if payload == b"ON" :
112
147
self ._set_state (True )
113
- elif payload == b' OFF' :
148
+ elif payload == b" OFF" :
114
149
self ._set_state (False )
115
150
else :
116
- self .logger .WARN ("%s.handle_command(%s) is weird - should match '(ON|OFF)'" , self .name , payload ) # pylint: disable=line-too-long
151
+ self .logger .warning (
152
+ "%s.handle_command(%s) is weird - should match '(ON|OFF)'" ,
153
+ self .name ,
154
+ payload ,
155
+ ) # pylint: disable=line-too-long
0 commit comments