-
Notifications
You must be signed in to change notification settings - Fork 0
/
task_share.py
452 lines (356 loc) · 16.4 KB
/
task_share.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
"""!
@file task_share.py
This file contains classes which allow tasks to share data without the risk
of data corruption by interrupts.
@author JR Ridgely
@date 2017-Jan-01 JRR Approximate date of creation of file
@date 2021-Dec-18 JRR Docstrings changed to work without DoxyPyPy
@copyright This program is copyright (c) 2017-2021 by JR Ridgely and released
under the GNU Public License, version 3.0.
It is intended for educational use only, but its use is not limited thereto.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
import array
import gc
import pyb
import micropython
## This is a system-wide list of all the queues and shared variables. It is
# used to create diagnostic printouts.
share_list = []
## This dictionary allows readable printouts of queue and share data types.
type_code_strings = {'b' : "int8", 'B' : "uint8",
'h' : "int16", 'H' : "uint16",
'i' : "int(?)", 'I' : "uint(?)",
'l' : "int32", 'L' : "uint32",
'q' : "int64", 'Q' : "uint64",
'f' : "float", 'd' : "double"}
def show_all ():
"""!
Create a string holding a diagnostic printout showing the status of
each queue and share in the system.
@return A string containing information about each queue and share
"""
gen = (str (item) for item in share_list)
return '\n'.join (gen)
# ============================================================================
class BaseShare:
"""!
Base class for queues and shares which exchange data between tasks.
One should never create an object from this class; it doesn't do anything
useful. It exists to implement things which are common between its child
classes @c Queue and @c Share.
"""
def __init__ (self, type_code, thread_protect = True, name = None):
"""!
Create a base queue object when called by a child class initializer.
This method creates the things which queues and shares have in common.
"""
self._type_code = type_code
self._thread_protect = thread_protect
# Add this queue to the global share and queue list
share_list.append (self)
# ============================================================================
class Queue (BaseShare):
"""!
A queue which is used to transfer data from one task to another.
If parameter 'thread_protect' is @c True when a queue is created, transfers
of data will be protected from corruption in the case that one task might
interrupt another due to use in a pre-emptive multithreading environment or
due to one task being run as an interrupt service routine.
An example of the creation and use of a queue is as follows:
@code
import task_share
# This queue holds unsigned short (16-bit) integers
my_queue = task_share.Queue ('H', 100, name="My Queue")
# Somewhere in one task, put data into the queue
my_queue.put (some_data)
# In another task, read data from the queue
something = my_queue.get ()
@endcode
"""
## A counter used to give serial numbers to queues for diagnostic use.
ser_num = 0
def __init__ (self, type_code, size, thread_protect = False,
overwrite = False, name = None):
"""!
Initialize a queue object to carry and buffer data between tasks.
This method sets up a queue by allocating memory for the contents and
setting up the components in an empty configuration.
Each queue can only carry data of one particular type which must be
chosen from the following list. The data type is specified by a
one-letter type code which is given as for the Python @c array.array
type, which can be any of the following:
| | | |
|:-----|:-----|:-----|
| **b** (signed char) | **B** (unsigned char) | 8 bit integers |
| **h** (signed short) | **H** (unsigned short) | 16 bit integers |
| **i** (signed int) | **I** (unsigned int) | 32 bit integers (probably) |
| **l** (signed long) | **L** (unsigned long) | 32 bit integers |
| **q** (signed long long) | **Q** (unsigned long long) | 64 bit integers |
| **f** (float) | **d** (double-precision float) | |
@param type_code The type of data items which the queue can hold
@param size The maximum number of items which the queue can hold
@param thread_protect @c True if mutual exclusion protection is used
@param overwrite If @c True, oldest data will be overwritten with new
data if the queue becomes full
@param name A short name for the queue, default @c QueueN where @c N
is a serial number for the queue
"""
# First call the parent class initializer
super ().__init__ (type_code, thread_protect, name)
self._size = size
self._overwrite = overwrite
self._name = str (name) if name != None \
else 'Queue' + str (Queue.ser_num)
Queue.ser_num += 1
# Allocate memory in which the queue's data will be stored
try:
self._buffer = array.array (type_code, range (size))
except MemoryError:
self._buffer = None
raise
except ValueError:
self._buffer = None
raise
# Initialize pointers to be used for reading and writing data
self.clear ()
# Since we may have allocated a bunch of memory, call the garbage
# collector to neaten up what memory is left for future use
gc.collect ()
@micropython.native
def put (self, item, in_ISR = False):
"""!
Put an item into the queue.
If there isn't room for the item, wait (blocking the calling process)
until room becomes available, unless the @c overwrite constructor
parameter was set to @c True to allow old data to be clobbered. If
non-blocking behavior without overwriting is needed, one should call
@c full() to ensure that the queue is not full before putting data
into it:
@code
| def some_task ():
| # Setup
| while True:
| if not my_queue.full ():
| my_queue.put (create_something_to_put ())
| yield 0
@endcode
@param item The item to be placed into the queue
@param in_ISR Set this to @c True if calling from within an ISR
"""
# If we're in an ISR and the queue is full and we're not allowed to
# overwrite data, we have to give up and exit
if self.full ():
if in_ISR:
return
# Wait (if needed) until there's room in the buffer for the data
if not self._overwrite:
while self.full ():
pass
# Prevent data corruption by blocking interrupts during data transfer
if self._thread_protect and not in_ISR:
_irq_state = pyb.disable_irq ()
# Write the data and advance the counts and pointers
self._buffer[self._wr_idx] = item
self._wr_idx += 1
if self._wr_idx >= self._size:
self._wr_idx = 0
self._num_items += 1
if self._num_items >= self._size: # Can't be fuller than full
self._num_items = self._size
if self._num_items > self._max_full: # Record maximum fillage
self._max_full = self._num_items
# Re-enable interrupts
if self._thread_protect and not in_ISR:
pyb.enable_irq (_irq_state)
@micropython.native
def get (self, in_ISR = False):
"""!
Read an item from the queue.
If there isn't anything in there, wait (blocking the calling process)
until something becomes available. If non-blocking reads are needed,
one should call @c any() to check for items before attempting to read
from the queue. This is usually done in a low priority task:
@code
| def some_task ():
| # Setup
| while True:
| if my_queue.any ():
| something = my_queue.get ()
| do_something_with (something)
| # More loop stuff
| yield 0
@endcode
@param in_ISR Set this to @c True if calling from within an ISR
"""
# Wait until there's something in the queue to be returned
while self.empty ():
pass
# Prevent data corruption by blocking interrupts during data transfer
if self._thread_protect and not in_ISR:
irq_state = pyb.disable_irq ()
# Get the item to be returned from the queue
to_return = self._buffer[self._rd_idx]
# Move the read pointer and adjust the number of items in the queue
self._rd_idx += 1
if self._rd_idx >= self._size:
self._rd_idx = 0
self._num_items -= 1
if self._num_items < 0:
self._num_items = 0
# Re-enable interrupts
if self._thread_protect and not in_ISR:
pyb.enable_irq (irq_state)
return (to_return)
@micropython.native
def any (self):
"""!
Check if there are any items in the queue.
Returns @c True if there are any items in the queue and @c False
if the queue is empty.
@return @c True if items are in the queue, @c False if not
"""
return (self._num_items > 0)
@micropython.native
def empty (self):
"""!
Check if the queue is empty.
Returns @c True if there are no items in the queue and @c False if
there are any items therein.
@return @c True if queue is empty, @c False if it's not empty
"""
return (self._num_items <= 0)
@micropython.native
def full (self):
"""!
Check if the queue is full.
This method returns @c True if the queue is already full and there
is no room for more data without overwriting existing data.
@return @c True if the queue is full
"""
return (self._num_items >= self._size)
@micropython.native
def num_in (self):
"""!
Check how many items are in the queue.
This method returns the number of items which are currently in the
queue.
@return The number of items in the queue
"""
return (self._num_items)
def clear (self):
"""!
Remove all contents from the queue.
"""
self._rd_idx = 0
self._wr_idx = 0
self._num_items = 0
self._max_full = 0
def __repr__ (self):
"""!
This method puts diagnostic information about the queue into a string.
It shows the queue's name and type as well as the maximum number of
items and queue size.
"""
return ('{:<12s} Queue<{:s}> Max Full {:d}/{:d}'.format (self._name,
type_code_strings[self._type_code], self._max_full, self._size))
# ============================================================================
class Share (BaseShare):
"""!
An item which holds data to be shared between tasks.
This class implements a shared data item which can be protected against
data corruption by pre-emptive multithreading. Multithreading which can
corrupt shared data includes the use of ordinary interrupts as well as the
use of pre-emptive multithreading such as by a Real-Time Operating System
(RTOS).
An example of the creation and use of a share is as follows:
@code
import task_share
# This share holds a signed short (16-bit) integer
my_share = task_share.Queue ('h', name="My Share")
# Somewhere in one task, put data into the share
my_share.put (some_data)
# In another task, read data from the share
something = my_share.get ()
@endcode
"""
## A counter used to give serial numbers to shares for diagnostic use.
ser_num = 0
def __init__ (self, type_code, thread_protect = True, name = None):
"""!
Create a shared data item used to transfer data between tasks.
This method allocates memory in which the shared data will be buffered.
Each share can only carry data of one particular type which must be
chosen from the following list. The data type is specified by a
one-letter type code which is given as for the Python @c array.array
type, which can be any of the following:
| | | |
|:-----|:-----|:-----|
| **b** (signed char) | **B** (unsigned char) | 8 bit integers |
| **h** (signed short) | **H** (unsigned short) | 16 bit integers |
| **i** (signed int) | **I** (unsigned int) | 32 bit integers (probably) |
| **l** (signed long) | **L** (unsigned long) | 32 bit integers |
| **q** (signed long long) | **Q** (unsigned long long) | 64 bit integers |
| **f** (float) | **d** (double-precision float) | |
@param type_code The type of data items which the share can hold
@param thread_protect True if mutual exclusion protection is used
@param name A short name for the share, default @c ShareN where @c N
is a serial number for the share
"""
# First call the parent class initializer
super ().__init__ (type_code, thread_protect, name)
self._buffer = array.array (type_code, [0])
self._name = str (name) if name != None \
else 'Share' + str (Share.ser_num)
Share.ser_num += 1
@micropython.native
def put (self, data, in_ISR = False):
"""!
Write an item of data into the share.
This method puts data into the share; any old data is overwritten.
This code disables interrupts during the writing so as to prevent
data corrupting by an interrupt service routine which might access
the same data.
@param data The data to be put into this share
@param in_ISR Set this to True if calling from within an ISR
"""
# Disable interrupts before writing the data
if self._thread_protect and not in_ISR:
irq_state = pyb.disable_irq ()
self._buffer[0] = data
# Re-enable interrupts
if self._thread_protect and not in_ISR:
pyb.enable_irq (irq_state)
@micropython.native
def get (self, in_ISR = False):
"""!
Read an item of data from the share.
If thread protection is enabled, interrupts are disabled during the time
that the data is being read so as to prevent data corruption by changes
in the data as it is being read.
@param in_ISR Set this to True if calling from within an ISR
"""
# Disable interrupts before reading the data
if self._thread_protect and not in_ISR:
irq_state = pyb.disable_irq ()
to_return = self._buffer[0]
# Re-enable interrupts
if self._thread_protect and not in_ISR:
pyb.enable_irq (irq_state)
return (to_return)
def __repr__ (self):
"""!
Puts diagnostic information about the share into a string.
Shares are pretty simple, so we just put the name and type.
"""
return ("{:<12s} Share<{:s}>".format (self._name,
type_code_strings[self._type_code]))