-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathserver.py
executable file
·252 lines (200 loc) · 7.57 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
#!/bin/env python3
from time import time
import asyncio, socket
import ssl
from modbus_exception import modbus_exception
from modbus_functions import func_dict
MODBUS_TCP_MAX_BUF_LEN = 253 + 7
class modbus_request:
def __init__(self, buf):
self.parse_request(buf)
def parse_request(self, buf):
self.tran_id = int.from_bytes(buf[0:2], byteorder='big')
self.proto_id = int.from_bytes(buf[2:4], byteorder='big')
self.len_field = int.from_bytes(buf[4:6], byteorder='big')
self.unit = int.from_bytes(buf[6:7], byteorder='big')
self.fcode = int.from_bytes(buf[7:8], byteorder='big')
self.data = buf[8:]
class modbus_response:
def __init__(self, req, fcode, data):
self.build_response(req, fcode, data)
def build_response(self, req, fcode, data):
self.tran_id = req.tran_id
self.proto_id = req.proto_id
self.len_field = 2 + len(data)
self.unit = req.unit
self.fcode = fcode
self.data = data
def check_fcode(fcode):
try:
func_dict[fcode]
res = True
except KeyError:
res = False
return res
def check_pdu(fcode, data):
if not check_fcode(fcode):
return modbus_exception.ILLEGAL_FUNCTION
if len(data) < 2:
return modbus_exception.ILLEGAL_DATA_ADDRESS
if 0x0 < fcode and fcode <= 0x6 and len(data) == 4:
return None
elif fcode == 0xF and len(data) > 6:
return None
elif fcode == 0x10 and len(data) > 7:
return None
return modbus_exception.ILLEGAL_DATA_VALUE
def have_modbus_exception(req, fcode, error):
res_fcode = fcode + 0x80
res_data = error.to_bytes(length=1, byteorder='big')
return res_fcode, res_data
class modbus_client:
def __init__(self, reader, writer, max_elapsed_time=2):
self.reader = reader
self.writer = writer
self.max_elapsed_time = max_elapsed_time
self.timing_started = False
def start_timing(self):
self.begin = time()
self.timing_started = True
def stop_timing(self):
self.timing_started = False
def is_timeout(self):
return self.timing_started and (time() - self.begin > self.max_elapsed_time)
def close(self):
self.writer.close()
class modbus_server:
def __init__(self, host='127.0.0.1', port=5020, unit=1):
self.host = host
self.port = port
self.unit = unit
def _is_this_unit(self, unit):
if unit == 0 or unit == self.unit:
return True
else:
return False
def _frame_ready(self, buf):
if len(buf) < 8:
return False
return True
def _check_header(self, req):
if req.proto_id != 0 or \
req.len_field != 1 + 1 + len(req.data) or \
not self._is_this_unit(req.unit):
return False
return True
def _have_modbus_request(self, buf):
return modbus_request(buf)
def _print_client(self, client, req):
client_info = "Client {} ".format(client.writer.get_extra_info('peername'))
client_info += "transaction id: 0x{:x}, protocol id: 0x{:x}," \
"len_field: {}, unit: 0x{:x}, ".format(
req.tran_id, req.proto_id, req.len_field, req.unit)
client_info += "function code: 0x{:x}, data length: {} bytes".format(
req.fcode, len(req.data))
print(client_info)
def _have_modbus_response(self, req, fcode, data):
return modbus_response(req, fcode, data)
def _have_response_frame(self, res):
buf = res.tran_id.to_bytes(length=2, byteorder='big')
buf += res.proto_id.to_bytes(length=2, byteorder='big')
buf += res.len_field.to_bytes(length=2, byteorder='big')
buf += res.unit.to_bytes(length=1, byteorder='big')
buf += res.fcode.to_bytes(length=1, byteorder='big')
buf += res.data
return buf
async def _handle_transaction(self, client, buf):
if not self._frame_ready(buf):
return -1
req = self._have_modbus_request(buf)
self._print_client(client, req)
if not self._check_header(req):
return -1
err = check_pdu(req.fcode, req.data)
if err is not None:
res_fcode, res_data = have_modbus_exception(req.fcode, err)
else:
res_fcode, res_data = func_dict[req.fcode](req.fcode, req.data)
res = self._have_modbus_response(req, res_fcode, res_data)
res_frame = self._have_response_frame(res)
client.writer.write(res_frame)
return 0
async def _handle_client(self, client):
buf = await client.reader.read(MODBUS_TCP_MAX_BUF_LEN)
if buf is not None and len(buf) > 0:
client.stop_timing()
err = await self._handle_transaction(client, buf)
if err != 0:
client.close()
return None
else:
client.start_timing()
elif client.is_timeout():
client.close()
return None
self.loop.create_task(self._handle_client(client))
async def _run_server(self, reader, writer):
addr = writer.get_extra_info('peername')
print("New client from {}".format(addr))
client = modbus_client(reader, writer)
self.loop.create_task(self._handle_client(client))
def run(self):
self.loop = asyncio.new_event_loop()
coro = asyncio.start_server(self._run_server, self.host, self.port)
self.loop.run_until_complete(coro)
try:
self.loop.run_forever()
except KeyboardInterrupt:
self.close()
def close(self):
self.loop.close()
def __del__(self):
self.close()
class modbus_tls_request(modbus_request):
def parse_request(self, buf):
self.fcode = int.from_bytes(buf[0:1], byteorder='big')
self.data = buf[1:]
class modbus_tls_response(modbus_response):
def build_response(self, req, fcode, data):
self.fcode = fcode
self.data = data
class modbus_tls_server(modbus_server):
def __init__(self, host='localhost', port=8020, cert=None, key=None):
self.host = host
self.port = port
self.context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
self.context.load_cert_chain(cert, key)
def _is_this_unit(self, unit):
return True
def _frame_ready(self, buf):
if len(buf) < 2:
return False
return True
def _check_header(self, req):
return True
def _have_modbus_request(self, buf):
return modbus_tls_request(buf)
def _print_client(self, client, req):
client_info = "Client {} ".format(client.writer.get_extra_info('peername'))
client_info += "function code: 0x{:x}, data length: {} bytes".format(
req.fcode, len(req.data))
print(client_info)
def _have_modbus_response(self, req, fcode, data):
return modbus_tls_response(req, fcode, data)
def _have_response_frame(self, res):
buf = res.fcode.to_bytes(length=1, byteorder='big')
buf += res.data
return buf
def run(self):
self.loop = asyncio.new_event_loop()
coro = asyncio.start_server(self._run_server, self.host, self.port, ssl=self.context)
self.loop.run_until_complete(coro)
try:
self.loop.run_forever()
except KeyboardInterrupt:
self.close()
if __name__ == '__main__':
server = modbus_server()
print("MODBUS server listens on {}:{}".format(server.host, server.port))
server.run()
print("MODBUS server closed")