-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathNavigationSensors.py
260 lines (217 loc) · 9.61 KB
/
NavigationSensors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
"""NavigationSensors - designed to parse data from devices such as the Microsoft
Kinect and ultrasonic sensors, in order to translate the data into
intuitively acceptable input (such as haptic feedback) for blind individuals.
The ideal use case is as such:
1) We have a vest with haptic feedback motors. We have a Kinect and an
ultrasonic sensor.
2) We parse data from the Kinect into subdivided segments - one
for every motor. For example, a vest with 4 motors would mean
to subdivide our image into 4 squares (likely in a 2x2 formation).
3) We use the ultrasonic sensor to account for the Kinect's minimum range
of 3 feet. The ultrasonic sensor reduces that to 6 cm.
4) We extract features from the Kinect and ultrasonic sensor (such as depth
and change in depth over time), and map these features onto
different vibration motors.
Written by Tushar Singal.
"""
import numpy as np
import freenect
import wiringpi as wp
import datetime
from skimage.util.shape import view_as_blocks
from copy import deepcopy
from threading import Thread, RLock
class Kinect:
"""Designed to parse output from the Kinect to various subdivided segments.
A use case involves haptic feedback vests, in which we can subdivide
the Kinect's output into distinct segments, which we can then map onto
vibration motors throughout the vest.
The Kinect's minimum range is about 3 feet. Sensor fusion is recommended
for nominal operation.
"""
def __init__(self, rows, cols, startStreaming=True, procFunc=None):
"""Processes input from the Kinect and returns parsed data.
Subdivides images from the Kinect into various sub-images, to which
a processing function ```procFunc``` is applied. Streams resultant
output, which can be acquired with ```getValues``` (for all row by col
processed subimages) or ```getValue``` (for a single subimage).
Args:
rows: The number of rows to divide the image into and process.
cols: The number of columns to divide the image into and process.
startStreaming: Whether to start (multithreaded, thread-safe) streaming
and processing upon object initialization. (default: {True})
procFunc: The post-processing function to apply to a sub-image.
Should take arguments of from (subImage, rowIndex, colIndex).
(default: {self._decayingAverage})
"""
self.processed = np.zeros((rows, cols), dtype=tuple)
self.cols = cols
self.rows = rows
self.lock = RLock()
if procFunc is None:
self.procFunc = self._decayingAverage
else:
self.procFunc = procFunc
if startStreaming:
Thread(target=self.stream).start()
def stream(self):
"""Start streaming data from the Kinect.
Begins recording from the Kinect and placing processed values in
self.processed, from which data can be retreived in a thread-safe
value using ```self.getValues``` or ```self.getValue```.
"""
while True:
self._processDepth(self._getRawDepth)
def getValues(self):
"""Retreives the entire matrix of processed values for the last frame.
Returns:
A two-dimensional array of tuples for which every tuple is the value
returned by ```self.procFunc``` when applied to a specific sub-image.
[np.array, dtype=tuple]
"""
self.lock.acquire()
copied = np.copy(self.processed)
self.lock.release()
return copied
def getValue(self, row, col):
"""Retreives a single processed value from the last frame.
Args:
row: The row-index of the sub-image for which processed values are
to be accessed.
col: The column-index of the sub-image for which processed values are
to be accessed.
Returns:
The tuple returned by the computation of ```self.procFunc``` on the subimage
located indexed ```row, col```.
[tuple]
"""
self.lock.acquire()
copied = deepcopy(self.processed[row, col])
self.lock.release()
return copied
def _processDepth(self, depthImage):
"""Processes depth with the provided function.
This function will take the raw depth image output from
```_getRawDepth``` and subdivide the image into ```self.rows``` by
```self.cols``` sub-images.
It then applies ```procFunc``` to each subdivision, and places
the returned tuple of values in ```self.processed```.
Args:
depthImage: The depth-image to process.
procFunc: The post-processing function to apply to a subdivision.
Should return a tuple.
"""
subdivided = view_as_blocks(
depthImage, block_shape=(self.rows, self.cols))
for row in range(self.rows):
for col in range(self.cols):
self.lock.acquire()
self.processed[row, col] = self.procFunc(
subdivided[row, col], row, col)
self.lock.release()
def _getRawDepth(self):
"""Obtains a snapshot of depth data from the Microsoft Kinect.
Returns:
Array of depth data values, ranging from [0-255], where
255 is far from Kinect, and 0 is close. Minimum range of 3 feet.
[Numpy array]
"""
array, _ = freenect.sync_get_depth()
array = array.astype(np.uint8)
return array
def _decayingAverage(self, subImage, row, col, numSamples=4):
""" Returns depth, and difference in depth over time.
Performs a decay-based low-pass filter over depth measurements.
Calculates the difference in depth from the old average. This results
in two-dimensional data: "depth" and "depth derivative", which can be
mapped to, say, "motor intensity" and "motor frequency" in our
haptic-feedback vest example.
Args:
subImage: The subdivision we are calculating over.
row: The width index of the current subdivision.
col: The height index of the current subdivision.
numSamples: Strength of the low-pass filter in number of frames.
(default: {4})
"""
windowAverage = int(np.average(subImage))
oldDistance = self.processed[row, col]
return (decayingAverage(windowAverage, oldDistance, numSamples),
oldDistance - windowAverage)
class Ultrasonic:
def __init__(self, trigPin=4, echoPin=4, numSamples=25, startStreaming=True):
"""Initializes recordings from an ultrasonic sensor.
Allows user to initialize an ultrasonic sensor which has trig and
echo pins. Averages measurements over ```numSamples```.
Args:
trigPin: The WiringPi pin trig is connected to. (default: {4})
echoPin: The WiringPi pin echo is connected to. (default: {4})
numSamples: The number of samples to perform a decaying
average over. (default: {4})
startStreaming: Whether to initialize streaming from the ultrasonic
sensor upon object initialization. (default: {True})
"""
self.trigPin = trigPin
self.echoPin = echoPin
self.numSamples = numSamples
self.lock = RLock()
self.averagedDepth = 90
def setup():
wp.wiringPiSetup()
wp.pinMode(self.trigPin, wp.OUTPUT)
wp.pinMode(self.echoPin, wp.INPUT)
wp.digitalWrite(self.trigPin, wp.LOW)
wp.delay(30)
setup()
if startStreaming():
Thread(target=self.stream).start()
def stream(self):
"""Start streaming data from the ultrasonic sensor.
Begins recording from the ultrasonic sensor and placing
retreived values in ```self.averagedDepth```, which can
be retrieved in a thread-safe manner with ```self.getDepth```.
"""
while True:
self._setDepth()
def getDepth(self):
"""Returns the last retreived depth in centimeters.
Returns:
The last-measured depth value in cm.
[int]
"""
self.lock.acquire()
copy = self.averagedDepth
self.lock.release()
return copy
def _setDepth(self):
"""Records depth from the ultrasonic sensor in centimeters.
Much credit to
https://ninedof.wordpress.com/2013/07/16/rpi-hc-sr04-ultrasonic-sensor-mini-project/
"""
wp.digitalWrite(self.trigPin, wp.HIGH)
wp.delayMicroseconds(20)
wp.digitalWrite(self.trigPin, wp.LOW)
while(wp.digitalRead(self.echoPin) == wp.LOW):
pass
startime = datetime.datetime.now()
while wp.digitalRead(self.echoPin) == wp.HIGH:
pass
traveltime = (datetime.datetime.now() - startime).total_seconds()
distance = 1000000 * traveltime / 58.0
self.lock.acquire()
self.averagedDepth = decayingAverage(
distance, self.averagedDepth, self.numSamples)
self.lock.release()
def decayingAverage(newVal, oldVal, numSamples):
"""Averages oldVal with newVal over numSamples.
Introduces newVal into oldVal, decaying oldVal by
a single sample and introducing newVal by a single sample.
Args:
newVal: The new value to average.
oldVal: The value to decay.
numSamples: The number of samples to decay over.
Returns:
Decayed average.
"""
newAvg = newVal * (1 / numSamples) + oldVal * \
((numSamples - 1) / numSamples)
return newAvg