-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPlot_Acquired_Data.py
86 lines (72 loc) · 2.38 KB
/
Plot_Acquired_Data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import matplotlib
matplotlib.use('TkAgg')
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import socket
import threading
import sys
import os
#Un-comment this if using OS-X.
os.system('defaults write org.python.python ApplePersistenceIgnoreState NO')
WindowSize = 5000
SampleRate = 1000.0
VoltsPerBit = 2.5/256
#Define global variables
data = []
displayData = [-2 for i in range(WindowSize)]
# This reads from a socket.
def data_listener():
global data
UDP_PORT = 9000
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.bind((UDP_IP, UDP_PORT))
while True:
newdata, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
data.extend(list(newdata))
#Handle command line arguments to get IP address
if (len(sys.argv) == 2):
try:
UDP_IP = sys.argv[1]
socket.inet_aton(UDP_IP)
except:
sys.exit('Invalid IP address, Try again')
else:
sys.exit('EMG_Acquire <Target IP Address>')
#Connect the UDP_Port
UDP_PORT = 9000
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
#Setup plot parameters
fig, ax = plt.subplots()
line, = ax.plot([], '-r') #red line, no points
plt.xlim([0,WindowSize/SampleRate])
plt.ylim([-VoltsPerBit*128,VoltsPerBit*128]) #samples will vary from 0 to 255
plt.xlabel('Time (Seconds)')
plt.ylabel('EMG (mV)')
#This function updates what data is displayed. It will be called in a separate thread created by the animation.FuncAnimation command
def animate(i):
global displayData, data
newData = list(data)
data = []
newDisplay = list(displayData[len(newData):len(displayData)] + newData)
displayData = list(newDisplay)
line.set_ydata([i*VoltsPerBit-1.25 for i in displayData])
return line,
#Init only required for blitting to give a clean slate.
def init():
line.set_xdata([i/SampleRate for i in range(WindowSize)])
line.set_ydata([i for i in displayData])
return line,
print('Connected to ', str(UDP_IP))
print("Listening for incoming messages...")
print('Close Plot Window to exit')
#Start a new thread to listen for data over UDP
thread = threading.Thread(target=data_listener)
thread.daemon = True
thread.start()
#Start a new thread to update the plot with acquired data
ani = animation.FuncAnimation(fig, animate, np.arange(1, 200), init_func=init, interval=25, blit=True)
#Show the plot
plt.show()