|
| 1 | +import subprocess |
| 2 | +import re |
| 3 | +import time |
| 4 | +from RPLCD.i2c import CharLCD |
| 5 | + |
| 6 | +# Define digit pairs from 00 to 61 (yes 61 because of leap seconds) |
| 7 | +digits = [ |
| 8 | + [24341, 25351], [24120, 25120], [24161, 25370], [24161, 25171], [24301, 25141], |
| 9 | + [24360, 25171], [24360, 25371], [24141, 25101], [24361, 25371], [24341, 25141], |
| 10 | + [2241, 2251], [2020, 2020], [2061, 2270], [2061, 2071], [2201, 2041], |
| 11 | + [2260, 2071], [2260, 2271], [2041, 2001], [2261, 2271], [2241, 2041], |
| 12 | + [6341, 27251], [6120, 27020], [6161, 27270], [6161, 27071], [6301, 27041], |
| 13 | + [6360, 27071], [6360, 27271], [6141, 27001], [6361, 27271], [6341, 27041], |
| 14 | + [6341, 7351], [6120, 7120], [6161, 7370], [6161, 7171], [6301, 7141], |
| 15 | + [6360, 7171], [6360, 7371], [6141, 7101], [6361, 7371], [6341, 7141], |
| 16 | + [20341, 4351], [20120, 4120], [20161, 4370], [20161, 4171], [20301, 4141], |
| 17 | + [20360, 4171], [20360, 4371], [20141, 4101], [20361, 4371], [20341, 4141], |
| 18 | + [26241, 7351], [26020, 7120], [26061, 7370], [26061, 7171], [26201, 7141], |
| 19 | + [26260, 7171], [26260, 7371], [26041, 7101], [26261, 7371], [26241, 7141], |
| 20 | + [26241, 27351], [26020, 27120]] |
| 21 | + |
| 22 | + |
| 23 | +# TODO: Don't forget to add the rplcd user to i2c group |
| 24 | +class LCDScreen(object): |
| 25 | + |
| 26 | + def __init__(self, welcome_text): |
| 27 | + self.lcd = CharLCD(i2c_expander='PCF8574', address=0x27, port=1, |
| 28 | + cols=16, rows=2, dotsize=8, |
| 29 | + charmap='A02', |
| 30 | + auto_linebreaks=True, |
| 31 | + backlight_enabled=True) |
| 32 | + |
| 33 | + self.lcd.backlight_enabled = True |
| 34 | + self.lcd.clear() |
| 35 | + self.lcd.write_string(welcome_text) |
| 36 | + |
| 37 | + # Create some custom characters |
| 38 | + self.lcd.create_char(0, (0, 0, 0, 0, 0, 0, 0, 0)) |
| 39 | + self.lcd.create_char(1, (16, 24, 24, 24, 24, 24, 24, 16)) |
| 40 | + self.lcd.create_char(2, (1, 3, 3, 3, 3, 3, 3, 1)) |
| 41 | + self.lcd.create_char(3, (17, 27, 27, 27, 27, 27, 27, 17)) |
| 42 | + self.lcd.create_char(4, (31, 31, 0, 0, 0, 0, 0, 0)) |
| 43 | + self.lcd.create_char(5, (0, 0, 0, 0, 0, 0, 31, 31)) |
| 44 | + self.lcd.create_char(6, (31, 31, 0, 0, 0, 0, 0, 31)) |
| 45 | + self.lcd.create_char(7, (31, 0, 0, 0, 0, 0, 31, 31)) |
| 46 | + |
| 47 | + self.lastScreenTime = time.time() # time since last screen switch |
| 48 | + self.prevStr = "" |
| 49 | + |
| 50 | + self.screens = ( |
| 51 | + (self.big_time_view, 6), |
| 52 | + # Rotating list of views on LCD with how many seconds to display each display. Round robin style. |
| 53 | + #(self.connectedUserView, 4), |
| 54 | + #(self.big_time_view, 6), |
| 55 | + (self.precision_view, 4), |
| 56 | + (self.big_time_view, 6), |
| 57 | + (self.ntptime_info, 5), |
| 58 | + (self.big_time_view, 6), |
| 59 | + (self.clockperf_view, 5), |
| 60 | + (self.get_hostname, 6), |
| 61 | + (self.get_current_ip, 6), |
| 62 | + ) # list of all views for rotation |
| 63 | + self.nrofscreens = len(self.screens) |
| 64 | + self.currentScreen = 0 |
| 65 | + |
| 66 | + def write_lcd(self, s): |
| 67 | + """Checks the string, if different than last call, update screen. |
| 68 | +
|
| 69 | + :param s: |
| 70 | + :return: |
| 71 | + """ |
| 72 | + |
| 73 | + if self.prevStr != s: # Oh what a shitty way around actually learning the ins and outs of encoding chars... |
| 74 | + # Display string has changed, update LCD |
| 75 | + self.lcd.clear() |
| 76 | + self.lcd.write_string(s) |
| 77 | + self.prevStr = s # Save so we can test if screen changed between calls, don't update if not needed to reduce LCD flicker |
| 78 | + |
| 79 | + def big_time_view(self): |
| 80 | + """Shows custom large local time on LCD""" |
| 81 | + |
| 82 | + now = time.localtime() |
| 83 | + hrs = int(time.strftime("%H")) |
| 84 | + minutes = int(time.strftime("%M")) |
| 85 | + sec = int(time.strftime("%S")) |
| 86 | + |
| 87 | + # Build string representing top and bottom rows |
| 88 | + L1 = "0" + str(digits[hrs][0]).zfill(5) + str(digits[minutes][0]).zfill(5) + str(digits[sec][0]).zfill(5) |
| 89 | + L2 = "0" + str(digits[hrs][1]).zfill(5) + str(digits[minutes][1]).zfill(5) + str(digits[sec][1]).zfill(5) |
| 90 | + |
| 91 | + # Convert strings from digits into pointers to custom character |
| 92 | + i = 0 |
| 93 | + XL1 = "" |
| 94 | + XL2 = "" |
| 95 | + while i < len(L1): |
| 96 | + XL1 = XL1 + chr(int(L1[i])) |
| 97 | + XL2 = XL2 + chr(int(L2[i])) |
| 98 | + i += 1 |
| 99 | + self.write_lcd(XL1 + "\r" + XL2) |
| 100 | + # return XL1 + "\r" + XL2 |
| 101 | + |
| 102 | + def precision_view(self): |
| 103 | + """Calculate and display the NTPD accuracy""" |
| 104 | + try: |
| 105 | + output = subprocess.check_output("ntpq -c rv", shell=True) |
| 106 | + returncode = 0 |
| 107 | + except subprocess.CalledProcessError as e: |
| 108 | + output = e.output |
| 109 | + returncode = e.returncode |
| 110 | + print(returncode) |
| 111 | + exit(1) |
| 112 | + |
| 113 | + precision = "" |
| 114 | + clkjitter = "" |
| 115 | + clkwander = "" |
| 116 | + theStr = "" |
| 117 | + searchResult = re.search(r'precision=(.*?),', output.decode('utf-8'), re.S) |
| 118 | + precision = searchResult.group(1) |
| 119 | + searchResult = re.search(r'.*clk_jitter=(.*?),', output.decode('utf-8'), re.S) |
| 120 | + clk_jitter = searchResult.group(1) |
| 121 | + if precision and clk_jitter: |
| 122 | + precision = (1 / 2.0 ** abs(float(precision))) * 1000000.0 |
| 123 | + theStr = "Prec: {:.5f} {}s\r".format(precision, chr(0xE4)) |
| 124 | + theStr += "ClkJit: {:>4} ms".format(clk_jitter) |
| 125 | + else: |
| 126 | + theStr = "Error: No\rPrecision data" |
| 127 | + self.write_lcd(theStr) |
| 128 | + # return theStr |
| 129 | + |
| 130 | + def ntptime_info(self): |
| 131 | + """Statistics from ntptime command""" |
| 132 | + try: |
| 133 | + output = subprocess.check_output("ntptime", shell=True) |
| 134 | + except subprocess.CalledProcessError as e: |
| 135 | + output = e.output |
| 136 | + returncode = e.returncode |
| 137 | + print(returncode) |
| 138 | + precision = re.search(r'precision (.* us).*tolerance (.* ppm)', output.decode('utf-8'), re.M | re.S) |
| 139 | + if precision: |
| 140 | + theStr = "Precis: {:>8}\r".format(precision.group(1)) |
| 141 | + theStr += "Stabi: {:>9}".format(precision.group(2)) |
| 142 | + else: |
| 143 | + theStr = "No info\nError" |
| 144 | + self.write_lcd(theStr) |
| 145 | + |
| 146 | + def clockperf_view(self): |
| 147 | + """Shows jitter etc""" |
| 148 | + output = subprocess.check_output("ntptime", shell=True) |
| 149 | + search = re.search(r'TAI offset.*offset (.*? us).*jitter (.* us)', output.decode('utf-8'), re.M | re.S) |
| 150 | + if search: |
| 151 | + theStr = "Offset: {:>8}\r".format(search.group(1)) |
| 152 | + theStr += "OSjitt: {:>8}".format(search.group(2)) |
| 153 | + else: |
| 154 | + theStr = "No offset\rinfo error" |
| 155 | + self.write_lcd(theStr) |
| 156 | + |
| 157 | + def connectedUserView(self): |
| 158 | + """Shows connected clients to ntpd""" |
| 159 | + highestCount = "NaN" |
| 160 | + try: |
| 161 | + output = subprocess.check_output("ntpdc -n -c monlist | awk '{if(NR>2)print $1}' | uniq | wc -l", |
| 162 | + shell=True) # Gets all the connected clients from ntp |
| 163 | + except subprocess.CalledProcessError as e: |
| 164 | + output = e.output |
| 165 | + returncode = e.returncode |
| 166 | + print(returncode) |
| 167 | + |
| 168 | + try: |
| 169 | + highestCount = subprocess.check_output( |
| 170 | + "ntpdc -n -c monlist | awk '{if(NR>2)print $4}' | sort -nrk1,1 | line", |
| 171 | + shell=True) # Gets the highest connections from connected clients |
| 172 | + except subprocess.CalledProcessError as e: |
| 173 | + output = e.output |
| 174 | + returncode = e.returncode |
| 175 | + print(returncode) |
| 176 | + |
| 177 | + theStr = "Con users: {:>6}".format(output.decode('utf-8')) |
| 178 | + theStr += "Hi cons: {:>8}".format(highestCount) |
| 179 | + self.write_lcd(theStr) |
| 180 | + |
| 181 | + def get_hostname(self): |
| 182 | + import socket |
| 183 | + hostname = socket.getfqdn() |
| 184 | + if hostname: |
| 185 | + self.write_lcd(hostname) |
| 186 | + else: |
| 187 | + self.write_lcd('No hostname') |
| 188 | + |
| 189 | + def get_current_ip(self): |
| 190 | + import socket |
| 191 | + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| 192 | + s.connect(('1.1.1.1', 1)) # connect() for UDP doesn't send packets |
| 193 | + local_ip_address = s.getsockname()[0] |
| 194 | + self.write_lcd(local_ip_address) |
| 195 | + |
| 196 | + def update_lcd(self): |
| 197 | + if time.time() - self.lastScreenTime > self.screens[self.currentScreen][1]: # Time to switch display |
| 198 | + self.currentScreen = self.currentScreen + 1 |
| 199 | + self.lastScreenTime = time.time() # reset screen timer |
| 200 | + if self.currentScreen > self.nrofscreens - 1: |
| 201 | + self.currentScreen = 0 |
| 202 | + self.screens[self.currentScreen][0]() |
0 commit comments