Skip to content

Commit 93306bc

Browse files
authored
Merge pull request #537 from Cougar/fix-533
Rewrite Tornado AsyncModbusSerialClient - #533
2 parents da6ab36 + 524ed34 commit 93306bc

File tree

3 files changed

+171
-4
lines changed

3 files changed

+171
-4
lines changed

pymodbus/client/asynchronous/tornado/__init__.py

Lines changed: 156 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import logging
99

10+
import time
1011
import socket
1112
from serial import Serial
1213
from tornado import gen
@@ -17,8 +18,15 @@
1718

1819
from pymodbus.client.asynchronous.mixins import (AsyncModbusClientMixin,
1920
AsyncModbusSerialClientMixin)
20-
from pymodbus.exceptions import ConnectionException
21-
from pymodbus.utilities import hexlify_packets
21+
22+
from pymodbus.compat import byte2int
23+
from pymodbus.exceptions import (ConnectionException,
24+
ModbusIOException,
25+
TimeOutException)
26+
from pymodbus.utilities import (hexlify_packets,
27+
ModbusTransactionState)
28+
from pymodbus.constants import Defaults
29+
2230

2331
LOGGER = logging.getLogger(__name__)
2432

@@ -291,6 +299,24 @@ class AsyncModbusSerialClient(BaseTornadoSerialClient):
291299
"""
292300
Tornado based asynchronous serial client
293301
"""
302+
def __init__(self, *args, **kwargs):
303+
"""
304+
Initializes AsyncModbusSerialClient.
305+
:param args:
306+
:param kwargs:
307+
"""
308+
self.state = ModbusTransactionState.IDLE
309+
self.timeout = kwargs.get('timeout', Defaults.Timeout)
310+
self.baudrate = kwargs.get('baudrate', Defaults.Baudrate)
311+
if self.baudrate > 19200:
312+
self.silent_interval = 1.75 / 1000 # ms
313+
else:
314+
self._t0 = float((1 + 8 + 2)) / self.baudrate
315+
self.silent_interval = 3.5 * self._t0
316+
self.silent_interval = round(self.silent_interval, 6)
317+
self.last_frame_end = 0.0
318+
super().__init__(*args, **kwargs)
319+
294320
def get_socket(self):
295321
"""
296322
Creates Pyserial object
@@ -318,6 +344,134 @@ def connect(self):
318344

319345
raise gen.Return(self)
320346

347+
def execute(self, request):
348+
"""
349+
Executes a transaction
350+
:param request: Request to be written on to the bus
351+
:return:
352+
"""
353+
request.transaction_id = self.transaction.getNextTID()
354+
355+
def _clear_timer():
356+
"""
357+
Clear serial waiting timeout
358+
"""
359+
if self.timeout_handle:
360+
self.io_loop.remove_timeout(self.timeout_handle)
361+
self.timeout_handle = None
362+
363+
def _on_timeout():
364+
"""
365+
Got timeout while waiting data from serial port
366+
"""
367+
LOGGER.warning("serial receive timeout")
368+
_clear_timer()
369+
if self.stream:
370+
self.io_loop.remove_handler(self.stream.fileno())
371+
self.framer.resetFrame()
372+
transaction = self.transaction.getTransaction(request.transaction_id)
373+
if transaction:
374+
transaction.set_exception(TimeOutException())
375+
376+
def _on_write_done(*args):
377+
"""
378+
Set up reader part after sucessful write to the serial
379+
"""
380+
LOGGER.debug("frame sent, waiting for a reply")
381+
self.last_frame_end = round(time.time(), 6)
382+
self.state = ModbusTransactionState.WAITING_FOR_REPLY
383+
self.io_loop.add_handler(self.stream.fileno(), _on_receive, IOLoop.READ)
384+
385+
def _on_fd_error(fd, *args):
386+
_clear_timer()
387+
self.io_loop.remove_handler(fd)
388+
self.close()
389+
self.transaction.getTransaction(request.transaction_id).set_exception(ModbusIOException(*args))
390+
391+
def _on_receive(fd, events):
392+
"""
393+
New data in serial buffer to read or serial port closed
394+
"""
395+
if events & IOLoop.ERROR:
396+
_on_fd_error(fd)
397+
return
398+
399+
try:
400+
waiting = self.stream.connection.in_waiting
401+
if waiting:
402+
data = self.stream.connection.read(waiting)
403+
LOGGER.debug(
404+
"recv: " + hexlify_packets(data))
405+
self.last_frame_end = round(time.time(), 6)
406+
except OSError as ex:
407+
_on_fd_error(fd, ex)
408+
return
409+
410+
self.framer.addToFrame(data)
411+
412+
# check if we have regular frame or modbus exception
413+
fcode = self.framer.decode_data(self.framer.getRawFrame()).get("fcode", 0)
414+
if fcode and (
415+
(fcode > 0x80 and len(self.framer.getRawFrame()) == exception_response_length)
416+
or
417+
(len(self.framer.getRawFrame()) == expected_response_length)
418+
):
419+
_clear_timer()
420+
self.io_loop.remove_handler(fd)
421+
self.state = ModbusTransactionState.IDLE
422+
self.framer.processIncomingPacket(
423+
b'', # already sent via addToFrame()
424+
self._handle_response,
425+
0, # don't care when `single=True`
426+
single=True,
427+
tid=request.transaction_id
428+
)
429+
430+
packet = self.framer.buildPacket(request)
431+
f = self._build_response(request.transaction_id)
432+
433+
response_pdu_size = request.get_response_pdu_size()
434+
expected_response_length = self.transaction._calculate_response_length(response_pdu_size)
435+
LOGGER.debug("expected_response_length = %d", expected_response_length)
436+
437+
exception_response_length = self.transaction._calculate_exception_length() # TODO: calculate once
438+
439+
if self.timeout:
440+
self.timeout_handle = self.io_loop.add_timeout(time.time() + self.timeout, _on_timeout)
441+
self._sendPacket(packet, callback=_on_write_done)
442+
443+
return f
444+
445+
def _sendPacket(self, message, callback):
446+
"""
447+
Sends packets on the bus with 3.5char delay between frames
448+
:param message: Message to be sent over the bus
449+
:return:
450+
"""
451+
@gen.coroutine
452+
def sleep(timeout):
453+
yield gen.sleep(timeout)
454+
455+
try:
456+
waiting = self.stream.connection.in_waiting
457+
if waiting:
458+
result = self.stream.connection.read(waiting)
459+
LOGGER.info(
460+
"Cleanup recv buffer before send: " + hexlify_packets(result))
461+
except OSError as e:
462+
self.transaction.getTransaction(request.transaction_id).set_exception(ModbusIOException(e))
463+
return
464+
465+
start = time.time()
466+
if self.last_frame_end:
467+
waittime = self.last_frame_end + self.silent_interval - start
468+
if waittime > 0:
469+
LOGGER.debug("Waiting for 3.5 char before next send - %f ms", waittime)
470+
sleep(waittime)
471+
472+
self.state = ModbusTransactionState.SENDING
473+
LOGGER.debug("send: " + hexlify_packets(message))
474+
self.stream.write(message, callback)
321475

322476
class AsyncModbusTCPClient(BaseTornadoClient):
323477
"""

pymodbus/exceptions.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,17 @@ def __init__(self, string=""):
105105
message = '[Error registering message] %s' % string
106106
ModbusException.__init__(self, message)
107107

108+
class TimeOutException(ModbusException):
109+
""" Error resulting from modbus response timeout """
110+
111+
def __init__(self, string=""):
112+
""" Initialize the exception
113+
114+
:param string: The message to append to the error
115+
"""
116+
message = "[Timeout] %s" % string
117+
ModbusException.__init__(self, message)
118+
108119

109120
# --------------------------------------------------------------------------- #
110121
# Exported symbols
@@ -114,5 +125,5 @@ def __init__(self, string=""):
114125
"ParameterException", "NotImplementedException",
115126
"ConnectionException", "NoSuchSlaveException",
116127
"InvalidMessageReceivedException",
117-
"MessageRegisterException"
128+
"MessageRegisterException", "TimeOutException"
118129
]

test/test_client_async_tornado.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,12 @@ def testSerialClientExecute(self, mock_serial, mock_seriostream, mock_ioloop):
228228
client = AsyncModbusSerialClient(ioloop=schedulers.IO_LOOP,
229229
framer=ModbusRtuFramer(
230230
ClientDecoder()),
231-
port=SERIAL_PORT)
231+
port=SERIAL_PORT,
232+
timeout=0)
232233
client.connect()
233234
client.stream = Mock()
234235
client.stream.write = Mock()
236+
client.stream.connection.read.return_value = b''
235237

236238
request = ReadCoilsRequest(1, 1)
237239
d = client.execute(request)

0 commit comments

Comments
 (0)