1- from enum import Enum
1+ import asyncio
2+ import logging
3+ import os
24
35import bleak .backends .bluezdbus .defs as defs # type: ignore
46
5- from typing import List , TYPE_CHECKING , Any , Dict
6-
7- from dbus_next .service import ServiceInterface , method , dbus_property # type: ignore
7+ from dbus_next import DBusError # type: ignore
88from dbus_next .constants import PropertyAccess # type: ignore
9+ from dbus_next .service import ServiceInterface , method , dbus_property # type: ignore
910from dbus_next .signature import Variant # type: ignore
11+ from enum import Enum
12+ from typing import List , TYPE_CHECKING , Any , Dict
1013
1114from .descriptor import BlueZGattDescriptor , DescriptorFlags # type: ignore
15+ from .session import NotifySession # type: ignore
1216
1317if TYPE_CHECKING :
1418 from bless .backends .bluezdbus .dbus .service import ( # type: ignore # noqa: F401
1519 BlueZGattService ,
1620 )
1721
22+ logger = logging .getLogger (name = __name__ )
23+
1824
1925class Flags (Enum ):
2026 BROADCAST = "broadcast"
@@ -68,7 +74,8 @@ def __init__(
6874 self ._service : "BlueZGattService" = service # noqa: F821
6975
7076 self ._value : bytes = b""
71- self ._notifying : bool = "notify" in self ._flags or "indicate" in self ._flags
77+ self ._notifying_calls : int = 0
78+ self ._subscribed_centrals : Dict [str , NotifySession ] = {}
7279 self .descriptors : List ["BlueZGattDescriptor" ] = [] # noqa: F821
7380
7481 super (BlueZGattCharacteristic , self ).__init__ (self .interface_name )
@@ -87,17 +94,22 @@ def Value(self) -> "ay": # type: ignore # noqa: F821 N802
8794
8895 @Value .setter # type: ignore
8996 def Value (self , value : "ay" ): # type: ignore # noqa: F821 N802
97+ if isinstance (value , bytearray ):
98+ value = bytes (value )
9099 self ._value = value
91- self .emit_properties_changed (changed_properties = {"Value" : self ._value })
92100
93101 @dbus_property (access = PropertyAccess .READ )
94102 def Notifying (self ) -> "b" : # type: ignore # noqa: F821 N802
95- return self ._notifying
103+ return self ._notifying_calls > 0 or self . NotifyAcquired
96104
97105 @dbus_property (access = PropertyAccess .READ ) # noqa: F722
98106 def Flags (self ) -> "as" : # type: ignore # noqa: F821 F722 N802
99107 return self ._flags
100108
109+ @dbus_property (access = PropertyAccess .READ ) # noqa: F722
110+ def NotifyAcquired (self ) -> "b" : # type: ignore # noqa: F821
111+ return len (self ._subscribed_centrals ) > 0
112+
101113 @method () # noqa: F722
102114 def ReadValue (self , options : "a{sv}" ) -> "ay" : # type: ignore # noqa: F722 F821 N802 E501
103115 """
@@ -137,27 +149,102 @@ def WriteValue(self, value: "ay", options: "a{sv}"): # type: ignore # noqa
137149 raise NotImplementedError ()
138150 f (self , value , options )
139151
152+ @method ()
153+ async def AcquireNotify (self , options : "a{sv}" ) -> "hq" : # type: ignore # noqa
154+ """
155+ Called when a central device subscribes to the
156+ characteristic
157+ """
158+ mtu : int = options ["mtu" ].value
159+ potential_device : Variant = options ["device" ]
160+ device_path : str = potential_device .value
161+
162+ # Can only process this if we are not already subscribed
163+ if self .Notifying and not self .NotifyAcquired :
164+ logger .error ("AcquireNotify attempted after StartNotify called" )
165+ raise DBusError (
166+ "org.bluez.Error.NotPermitted" , "AcquireNotify not permitted"
167+ )
168+
169+ session : NotifySession = NotifySession (
170+ device_path , mtu , self ._service .app .bus , self .ReleaseNotify
171+ )
172+ rx : int = await session .start ()
173+ address : str = await session .get_device_address ()
174+ logger .debug (f"AcquireNotify on { self .UUID } from { address } on FD { rx } " )
175+
176+ f = self ._service .app .StartNotify
177+ if f is None :
178+ raise NotImplementedError ()
179+ f (self , {"device" : address })
180+ self ._subscribed_centrals [address ] = session
181+
182+ async def close_rx ():
183+ logger .debug ("Closing RX" )
184+ await asyncio .sleep (2 )
185+ os .close (rx )
186+ # asyncio.get_running_loop().call_soon_threadsafe(os.close, rx)
187+
188+ asyncio .create_task (close_rx ())
189+ return [rx , mtu ]
190+
191+ async def ReleaseNotify (self , session : NotifySession ):
192+ address : str = await session .get_device_address ()
193+ logger .debug (f"ReleaseNotify on { self .UUID } from { address } " )
194+ f = self ._service .app .StopNotify
195+ if f is None :
196+ raise NotImplementedError ()
197+ f (self , {"device" : address })
198+ del self ._subscribed_centrals [address ]
199+
140200 @method ()
141201 def StartNotify (self ): # noqa: N802
142202 """
143203 Begin a subscription to the characteristic
144204 """
205+ if self .NotifyAcquired :
206+ logger .info (
207+ "StartNotify called. "
208+ + "AcquireNotify already called. "
209+ + "Ignoring call to Start Notify"
210+ )
211+ return
212+
213+ logger .debug (f"StartNotify on { self .UUID } " )
145214 f = self ._service .app .StartNotify
146215 if f is None :
147216 raise NotImplementedError ()
148217 f (self , {})
149- self ._service . app . subscribed_characteristics . append ( self . _uuid )
218+ self ._notifying_calls += 1
150219
151220 @method ()
152- def StopNotify (self ): # noqa: N802
221+ async def StopNotify (self ): # noqa: N802
153222 """
154223 Stop a subscription to the characteristic
155224 """
225+ if self .NotifyAcquired :
226+ logger .error ("StopNotify called but notifications are aquried!" )
227+ return
228+
156229 f = self ._service .app .StopNotify
157230 if f is None :
158231 raise NotImplementedError ()
159232 f (self , {})
160- self ._service .app .subscribed_characteristics .remove (self ._uuid )
233+ self ._notifying_calls -= 1
234+
235+ def update_value (self ) -> None :
236+ """
237+ This method does not actually alter that value of the characteristic,
238+ but rather sends updates to subscribed centrals.
239+ """
240+ if self .NotifyAcquired is True :
241+ for central_id , session in self ._subscribed_centrals .items ():
242+ logger .debug (f"Sending update to { central_id } " )
243+ if not session .send_update (self ._value ):
244+ logger .warn (f"Failed to send update to { central_id } " )
245+
246+ else :
247+ self .emit_properties_changed (changed_properties = {"Value" : self ._value })
161248
162249 async def add_descriptor (
163250 self , uuid : str , flags : List [DescriptorFlags ], value : Any
0 commit comments