|
| 1 | +""" |
| 2 | +Code based on: https://github.com/Gozem/am2320/blob/45a20076efb9a19e91bd50f229d1cdd53f1134d4/am2320.py#L1 |
| 3 | +License at time of code reference: MIT https://github.com/Gozem/am2320/blob/45a20076efb9a19e91bd50f229d1cdd53f1134d4/LICENSE#L1 |
| 4 | +""" |
| 5 | + |
| 6 | +import posix |
| 7 | +import time |
| 8 | +from dataclasses import dataclass |
| 9 | +from fcntl import ioctl |
| 10 | + |
| 11 | + |
| 12 | +@dataclass |
| 13 | +class AM2320Reading: |
| 14 | + temperature: float |
| 15 | + humidity: float |
| 16 | + |
| 17 | + def __str__(self): |
| 18 | + return f"Temperature: {self.temperature}, Humidity: {self.humidity}" |
| 19 | + |
| 20 | + |
| 21 | +class AM2320: |
| 22 | + I2C_ADDR = 0x5C |
| 23 | + I2C_SLAVE = 0x0703 |
| 24 | + |
| 25 | + def __init__(self, i2cbus: int = 1): |
| 26 | + self._fd = posix.open("/dev/i2c-%d" % i2cbus, posix.O_RDWR) |
| 27 | + |
| 28 | + ioctl(self._fd, self.I2C_SLAVE, self.I2C_ADDR) |
| 29 | + |
| 30 | + def __del__(self): |
| 31 | + posix.close(self._fd) |
| 32 | + |
| 33 | + @staticmethod |
| 34 | + def _calc_crc16(data): |
| 35 | + crc = 0xFFFF |
| 36 | + for x in data: |
| 37 | + crc = crc ^ x |
| 38 | + for bit in range(8): |
| 39 | + if (crc & 0x0001) == 0x0001: |
| 40 | + crc >>= 1 |
| 41 | + crc ^= 0xA001 |
| 42 | + else: |
| 43 | + crc >>= 1 |
| 44 | + return crc |
| 45 | + |
| 46 | + @staticmethod |
| 47 | + def _combine_bytes(msb, lsb): |
| 48 | + return msb << 8 | lsb |
| 49 | + |
| 50 | + def read_sensor(self) -> AM2320Reading: |
| 51 | + # wake AM2320 up, goes to sleep to not warm up and affect the humidity sensor |
| 52 | + # This write will fail as AM2320 won't ACK this write |
| 53 | + try: |
| 54 | + posix.write(self._fd, b"\0x00") |
| 55 | + except: |
| 56 | + pass |
| 57 | + time.sleep(0.001) # Wait at least 0.8ms, at most 3ms |
| 58 | + |
| 59 | + # write at addr 0x03, start reg = 0x00, num regs = 0x04 */ |
| 60 | + posix.write(self._fd, b"\x03\x00\x04") |
| 61 | + time.sleep(0.0016) # Wait at least 1.5ms for result |
| 62 | + |
| 63 | + # Read out 8 bytes of result data |
| 64 | + # Byte 0: Should be Modbus function code 0x03 |
| 65 | + # Byte 1: Should be number of registers to read (0x04) |
| 66 | + # Byte 2: Humidity msb |
| 67 | + # Byte 3: Humidity lsb |
| 68 | + # Byte 4: Temperature msb |
| 69 | + # Byte 5: Temperature lsb |
| 70 | + # Byte 6: CRC lsb byte |
| 71 | + # Byte 7: CRC msb byte |
| 72 | + data = bytearray(posix.read(self._fd, 8)) |
| 73 | + |
| 74 | + # Check data[0] and data[1] |
| 75 | + if data[0] != 0x03 or data[1] != 0x04: |
| 76 | + raise ValueError("First two read bytes are a mismatch") |
| 77 | + |
| 78 | + # CRC check |
| 79 | + if self._calc_crc16(data[0:6]) != self._combine_bytes(data[7], data[6]): |
| 80 | + raise ValueError("CRC failed") |
| 81 | + |
| 82 | + # Temperature resolution is 16Bit, |
| 83 | + # temperature highest bit (Bit15) is equal to 1 indicates a |
| 84 | + # negative temperature, the temperature highest bit (Bit15) |
| 85 | + # is equal to 0 indicates a positive temperature; |
| 86 | + # temperature in addition to the most significant bit (Bit14 ~ Bit0) |
| 87 | + # indicates the temperature sensor string value. |
| 88 | + # Temperature sensor value is a string of 10 times the |
| 89 | + # actual temperature value. |
| 90 | + temp = self._combine_bytes(data[4], data[5]) |
| 91 | + if temp & 0x8000: |
| 92 | + temp = -(temp & 0x7FFF) |
| 93 | + temp /= 10.0 |
| 94 | + |
| 95 | + humi = self._combine_bytes(data[2], data[3]) / 10.0 |
| 96 | + |
| 97 | + return AM2320Reading( |
| 98 | + temperature=temp, |
| 99 | + humidity=humi, |
| 100 | + ) |
| 101 | + |
| 102 | + |
| 103 | +if __name__ == "__main__": |
| 104 | + am2320 = AM2320(1) |
| 105 | + reading = am2320.read_sensor() |
| 106 | + print(reading) |
0 commit comments