|
| 1 | +from typing import override as _override |
| 2 | + |
| 3 | +from can import Message as _Message |
| 4 | + |
| 5 | +from leads import DataContainer as _DataContainer |
| 6 | +from leads_can.prototype import CANBus |
| 7 | + |
| 8 | + |
| 9 | +class OBD2(CANBus): |
| 10 | + @_override |
| 11 | + def write(self, payload: _DataContainer) -> None: |
| 12 | + t = payload.time_stamp() * .001 |
| 13 | + super().write(_Message(t, 0x00, data=str(payload.voltage).encode())) |
| 14 | + super().write(_Message(t, 0x01, data=str(payload.speed).encode())) |
| 15 | + super().write(_Message(t, 0x10, data=str(payload.front_wheel_speed).encode())) |
| 16 | + super().write(_Message(t, 0x11, data=str(payload.rear_wheel_speed).encode())) |
| 17 | + super().write(_Message(t, 0x20, data=str(payload.forward_acceleration).encode())) |
| 18 | + super().write(_Message(t, 0x21, data=str(payload.lateral_acceleration).encode())) |
| 19 | + super().write(_Message(t, 0x30, data=str(payload.mileage).encode())) |
| 20 | + super().write(_Message(t, 0x40, data=str(payload.gps_ground_speed).encode(), |
| 21 | + is_error_frame=not payload.gps_valid)) |
| 22 | + super().write(_Message(t, 0x41, data=str(payload.latitude).encode(), |
| 23 | + is_error_frame=not payload.gps_valid)) |
| 24 | + super().write(_Message(t, 0x42, data=str(payload.longitude).encode(), |
| 25 | + is_error_frame=not payload.gps_valid)) |
| 26 | + super().write(_Message(t, 0x50, data=str(payload.throttle).encode())) |
| 27 | + super().write(_Message(t, 0x51, data=str(payload.brake).encode())) |
0 commit comments