-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil_fun.py
239 lines (187 loc) · 7.19 KB
/
util_fun.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
import numpy as np
from bleak import BleakScanner
from re import match as re_match
from asyncio import Queue
from constants import *
# import csv
# from json import dumps
# from os.path import exists
# import datetime
async def scan_movesense_address(timeout = 5.0):
'''
Scan for ble Movesense devices using BleakScanner's discover method for a certain time
Args:
timeout: A float to set the descovering time
Returns:
list: A list of strings with each element [device.address, device.name]
Example:
>>> list = scan_movesense_address()
[0C:8C:DC:41:DB:EB, Movesense 223430000019]
'''
if timeout <= 0:
raise ValueError("Timeout cannot be negative.")
devices = await BleakScanner.discover(timeout)
# print(devices)
movesense_devices = []
if devices is not None:
for d in devices:
# Store movesense ble devices
if d.name and d.name.startswith(DEVICENAME):
movesense_devices.append([str(d.address), str(d.name)])
if not movesense_devices:
raise NameError("No movesense devices.")
return movesense_devices
else :
raise NameError("No devise found.")
def is_valid_mac_address(mac : str):
'''
Validate the the given string has a mac address format (use of ':')
Args:
mac: A string type address
Returns:
bool: If the given str matches True else False
Example:
>>> bool = is_valid_mac_address(0C:8C:DC:41:DB:EB)
True
'''
# Define a regular expression pattern for a valid MAC address
mac_pattern = r'^([0-9A-Fa-f]{2}[:]){5}([0-9A-Fa-f]{2})$'
# Use the re.match() function to check if the string matches the pattern
return bool(re_match(mac_pattern, mac))
def is_valid_request(request : str, hz = int):
'''
Check that the given strings has is one of the valid requests and return the full request to
write to a movesense device (/meas/request/hz)
Args:
request: A string for the request type
hz: An int for the tick rate (if needed)
Returns:
path: string with the full request
False: Bool if wrong request given
Example:
>>> request = is_valid_request("ecg",125)
"/meas/ecg/125"
>>> request = is_valid_request("imu9m",11111111111)
False
'''
if request.lower() in MAGI_REQUEST_TYPES:
if hz in MAGI_SAMPLE_RATES:
path = "/meas/" + str(request) + "/" + str(hz)
return path
else:
return False
elif request.lower() == ECG_REQUEST_TYPE:
if hz in ECG_SAMPLE_RATES:
path = "/meas/" + str(request) + "/" + str(hz)
return path
else:
return False
elif request.lower() == HR_REQUEST_TYPE:
path = "/meas/" + str(request)
return path
else:
return False
async def magi_data_format(queue : Queue()):
'''
Pop out one element from the input queue and formats it into a structured NumPy array.
The array is structured to [timestamp, x(1_n), y(1_n), z(1_n)] format.
For rate 13 Hz, the format is [timestamp, x1, y1, z1], for rate 26 the structure is [timestamp, x1, x, y1, y2, z1, z2] etc.
Args:
queue: The asyncio fifo Queue that holds the stored data
Returns:
magi_data: The np array holding the magi data.
Example:
>>> data = await magi_data_format(queue)
[ time, data ]
'''
dt = np.dtype([('timestamp', np.uint32), ('elements', np.ndarray)])
# magi_data = np.array([], np.ndarray)
data = await queue.get()
if data is None:
return None
data_xyz = np.array(data[1:], dtype = np.float32)
# magi_data = np.array([ ( data[0], data_xyz )], dtype = dt)
# magi_data = np.append(magi_data, full_data)
return np.array([ ( data[0], data_xyz )], dtype = dt)
async def ecg_data_format(queue : Queue()):
# async def ecg_data_format(queue : Queue(), store_time = True):
'''
Pop out one element from the input queue and format it into a structured NumPy array.
The array is structured to [timestamp, ecg(1-16)] format.
Args:
queue: The asyncio fifo Queue that holds the stored data
store_time: Boolean to store timestump
Returns:
ecg_data: The np array that holding the ecg data.
Example:
>>> data = await ecg_data_format(queue)
[ time, ecg1, ... , ecg16 ]
>>> data = await ecg_data_format(queue, False)
[ ecg1, ... , ecg16 ]
'''
ecg_data = np.array([], dtype = np.float32)
# start_time = store_time
data = await queue.get()
if data is None:
return None
start_time = data[0]
ecg_data = np.array(data[1:])
# if store_time:
# return [start_time, ecg_data]
# else:
# return ecg_data
return [start_time, ecg_data]
async def hr_data_format(queue : Queue()):
'''
Pop out one element from the input queue and format it into a structured NumPy array.
The array is structured to [heart rate, RR interval] format.
Args:
queue: The asyncio fifo Queue that holds the stored data
Returns:
hr_data: The np array that holding the hr data.
Example:
>>> data = await hr_data_format(queue)
[ 65.1, 923]
'''
dt = np.dtype([ ('beat_rate', np.float32), ('RR_int', np.uint16)])
data = await queue.get()
if data is None:
return data
data = np.array([(data[0],data[1])], dtype=dt)
return data
async def temp_data_format(queue : Queue):
'''
Pop out one element from the input queue and format it into a structured NumPy array.
The array is structured to [timestamp, temp] format.
Args:
queue: The asyncio fifo Queue that holds the stored data
Returns:
temp_data: The np array that holding the temp data.
Example:
>>> data = await hr_data_format(queue)
[ 12124, 300]
'''
data = await queue.get()
return data
async def ecg_from_queue(queue: Queue, save_timestamps: bool = False):
'''
Helper function to pop and store all the ecg data from the current queue
Args:
queue: The asyncio fifo Queue that holds the stored data
Returns:
data: The np array that holding the data.
Example:
>>> data = await get_all_ecg_from_queue(queue)
[ , 300]
'''
# set up data format
ecg_data = np.array([],dtype=np.float32)
timestamps = np.array([], dtype=np.uint32)
while queue.qsize() > 0:
temp_timestamp, temp_ecg_data = await ecg_data_format(queue)
ecg_data = np.append(ecg_data, np.array(temp_ecg_data, dtype=np.float32))
timestamps = np.append(timestamps, np.uint32(temp_timestamp))
if save_timestamps == True:
return {'timestamps' : timestamps, 'ecg_data' : ecg_data}
else:
return ecg_data