Skip to content

Commit 6a16408

Browse files
committed
Merge branch 'pr-575-fix-sync-client-trans' into test-fixes
2 parents 593e187 + 24c0ffc commit 6a16408

File tree

6 files changed

+424
-15
lines changed

6 files changed

+424
-15
lines changed

pymodbus/client/sync.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,12 @@ def _recv(self, size):
258258
""" Reads data from the underlying descriptor
259259
260260
:param size: The number of bytes to read
261-
:return: The bytes read
261+
:return: The bytes read if the peer sent a response, or a zero-length
262+
response if no data packets were received from the client at
263+
all.
264+
:raises: ConnectionException if the socket is not initialized, or the
265+
peer either has closed the connection before this method is
266+
invoked or closes it before sending any data before timeout.
262267
"""
263268
if not self.socket:
264269
raise ConnectionException(self.__str__())
@@ -275,9 +280,9 @@ def _recv(self, size):
275280

276281
timeout = self.timeout
277282

278-
# If size isn't specified read 1 byte at a time.
283+
# If size isn't specified read up to 4096 bytes at a time.
279284
if size is None:
280-
recv_size = 1
285+
recv_size = 4096
281286
else:
282287
recv_size = size
283288

@@ -289,6 +294,9 @@ def _recv(self, size):
289294
ready = select.select([self.socket], [], [], end - time_)
290295
if ready[0]:
291296
recv_data = self.socket.recv(recv_size)
297+
if recv_data == b'':
298+
return self._handle_abrupt_socket_close(
299+
size, data, time.time() - time_)
292300
data.append(recv_data)
293301
data_length += len(recv_data)
294302
time_ = time.time()
@@ -305,6 +313,35 @@ def _recv(self, size):
305313

306314
return b"".join(data)
307315

316+
def _handle_abrupt_socket_close(self, size, data, duration):
317+
""" Handle unexpected socket close by remote end
318+
319+
Intended to be invoked after determining that the remote end
320+
has unexpectedly closed the connection, to clean up and handle
321+
the situation appropriately.
322+
323+
:param size: The number of bytes that was attempted to read
324+
:param data: The actual data returned
325+
:param duration: Duration from the read was first attempted
326+
until it was determined that the remote closed the
327+
socket
328+
:return: The more than zero bytes read from the remote end
329+
:raises: ConnectionException If the remote end didn't send any
330+
data at all before closing the connection.
331+
"""
332+
self.close()
333+
readsize = ("read of %s bytes" % size if size
334+
else "unbounded read")
335+
msg = ("%s: Connection unexpectedly closed "
336+
"%.6f seconds into %s" % (self, duration, readsize))
337+
if data:
338+
result = b"".join(data)
339+
msg += " after returning %s bytes" % len(result)
340+
_logger.warning(msg)
341+
return result
342+
msg += " without response from unit before it closed connection"
343+
raise ConnectionException(msg)
344+
308345
def is_socket_open(self):
309346
return True if self.socket is not None else False
310347

pymodbus/client/sync_diag.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import socket
2+
import logging
3+
import time
4+
5+
from pymodbus.constants import Defaults
6+
from pymodbus.client.sync import ModbusTcpClient
7+
from pymodbus.transaction import ModbusSocketFramer
8+
from pymodbus.exceptions import ConnectionException
9+
10+
_logger = logging.getLogger(__name__)
11+
12+
LOG_MSGS = {
13+
'conn_msg': 'Connecting to modbus device %s',
14+
'connfail_msg': 'Connection to (%s, %s) failed: %s',
15+
'discon_msg': 'Disconnecting from modbus device %s',
16+
'timelimit_read_msg':
17+
'Modbus device read took %.4f seconds, '
18+
'returned %s bytes in timelimit read',
19+
'timeout_msg':
20+
'Modbus device timeout after %.4f seconds, '
21+
'returned %s bytes %s',
22+
'delay_msg':
23+
'Modbus device read took %.4f seconds, '
24+
'returned %s bytes of %s expected',
25+
'read_msg':
26+
'Modbus device read took %.4f seconds, '
27+
'returned %s bytes of %s expected',
28+
'unexpected_dc_msg': '%s %s'}
29+
30+
31+
class ModbusTcpDiagClient(ModbusTcpClient):
32+
"""
33+
Variant of pymodbus.client.sync.ModbusTcpClient with additional
34+
logging to diagnose network issues.
35+
36+
The following events are logged:
37+
38+
+---------+-----------------------------------------------------------------+
39+
| Level | Events |
40+
+=========+=================================================================+
41+
| ERROR | Failure to connect to modbus unit; unexpected disconnect by |
42+
| | modbus unit |
43+
+---------+-----------------------------------------------------------------+
44+
| WARNING | Timeout on normal read; read took longer than warn_delay_limit |
45+
+---------+-----------------------------------------------------------------+
46+
| INFO | Connection attempt to modbus unit; disconnection from modbus |
47+
| | unit; each time limited read |
48+
+---------+-----------------------------------------------------------------+
49+
| DEBUG | Normal read with timing information |
50+
+---------+-----------------------------------------------------------------+
51+
52+
Reads are differentiated between "normal", which reads a specified number of
53+
bytes, and "time limited", which reads all data for a duration equal to the
54+
timeout period configured for this instance.
55+
"""
56+
57+
# pylint: disable=no-member
58+
59+
def __init__(self, host='127.0.0.1', port=Defaults.Port,
60+
framer=ModbusSocketFramer, **kwargs):
61+
""" Initialize a client instance
62+
63+
The keys of LOG_MSGS can be used in kwargs to customize the messages.
64+
65+
:param host: The host to connect to (default 127.0.0.1)
66+
:param port: The modbus port to connect to (default 502)
67+
:param source_address: The source address tuple to bind to (default ('', 0))
68+
:param timeout: The timeout to use for this socket (default Defaults.Timeout)
69+
:param warn_delay_limit: Log reads that take longer than this as warning.
70+
Default True sets it to half of "timeout". None never logs these as
71+
warning, 0 logs everything as warning.
72+
:param framer: The modbus framer to use (default ModbusSocketFramer)
73+
74+
.. note:: The host argument will accept ipv4 and ipv6 hosts
75+
"""
76+
self.warn_delay_limit = kwargs.get('warn_delay_limit', True)
77+
super().__init__(host, port, framer, **kwargs)
78+
if self.warn_delay_limit is True:
79+
self.warn_delay_limit = self.timeout / 2
80+
81+
# Set logging messages, defaulting to LOG_MSGS
82+
for (k, v) in LOG_MSGS.items():
83+
self.__dict__[k] = kwargs.get(k, v)
84+
85+
def connect(self):
86+
""" Connect to the modbus tcp server
87+
88+
:returns: True if connection succeeded, False otherwise
89+
"""
90+
if self.socket:
91+
return True
92+
try:
93+
_logger.info(self.conn_msg, self)
94+
self.socket = socket.create_connection(
95+
(self.host, self.port),
96+
timeout=self.timeout,
97+
source_address=self.source_address)
98+
except socket.error as msg:
99+
_logger.error(self.connfail_msg, self.host, self.port, msg)
100+
self.close()
101+
return self.socket is not None
102+
103+
def close(self):
104+
""" Closes the underlying socket connection
105+
"""
106+
if self.socket:
107+
_logger.info(self.discon_msg, self)
108+
self.socket.close()
109+
self.socket = None
110+
111+
def _recv(self, size):
112+
try:
113+
start = time.time()
114+
115+
result = super()._recv(size)
116+
117+
delay = time.time() - start
118+
if self.warn_delay_limit is not None and delay >= self.warn_delay_limit:
119+
self._log_delayed_response(len(result), size, delay)
120+
elif not size:
121+
_logger.debug(self.timelimit_read_msg, delay, len(result))
122+
else:
123+
_logger.debug(self.read_msg, delay, len(result), size)
124+
125+
return result
126+
except ConnectionException as ex:
127+
# Only log actual network errors, "if not self.socket" then it's a internal code issue
128+
if 'Connection unexpectedly closed' in ex.string:
129+
_logger.error(self.unexpected_dc_msg, self, ex)
130+
raise ex
131+
132+
def _log_delayed_response(self, result_len, size, delay):
133+
if not size and result_len > 0:
134+
_logger.info(self.timelimit_read_msg, delay, result_len)
135+
elif (result_len == 0 or (size and result_len < size)) and delay >= self.timeout:
136+
read_type = ("of %i expected" % size) if size else "in timelimit read"
137+
_logger.warning(self.timeout_msg, delay, result_len, read_type)
138+
else:
139+
_logger.warning(self.delay_msg, delay, result_len, size)
140+
141+
def __str__(self):
142+
""" Builds a string representation of the connection
143+
144+
:returns: The string representation
145+
"""
146+
return "ModbusTcpDiagClient(%s:%s)" % (self.host, self.port)
147+
148+
149+
def get_client():
150+
""" Returns an appropriate client based on logging level
151+
152+
This will be ModbusTcpDiagClient by default, or the parent class
153+
if the log level is such that the diagnostic client will not log
154+
anything.
155+
156+
:returns: ModbusTcpClient or a child class thereof
157+
"""
158+
return ModbusTcpDiagClient if _logger.isEnabledFor(logging.ERROR) else ModbusTcpClient
159+
160+
161+
# --------------------------------------------------------------------------- #
162+
# Exported symbols
163+
# --------------------------------------------------------------------------- #
164+
165+
__all__ = [
166+
"ModbusTcpDiagClient", "get_client"
167+
]

pymodbus/transaction.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ def execute(self, request):
217217
"/Unable to decode response")
218218
response = ModbusIOException(last_exception,
219219
request.function_code)
220+
self.client.close()
220221
if hasattr(self.client, "state"):
221222
_logger.debug("Changing transaction state from "
222223
"'PROCESSING REPLY' to "
@@ -227,6 +228,7 @@ def execute(self, request):
227228
return response
228229
except ModbusIOException as ex:
229230
# Handle decode errors in processIncomingPacket method
231+
self.client.close()
230232
_logger.exception(ex)
231233
self.client.close()
232234
self.client.state = ModbusTransactionState.TRANSACTION_COMPLETE
@@ -319,9 +321,10 @@ def _recv(self, expected_response_length, full):
319321

320322
read_min = self.client.framer.recvPacket(min_size)
321323
if len(read_min) != min_size:
324+
msg_start = "Incomplete message" if read_min else "No response"
322325
raise InvalidMessageReceivedException(
323-
"Incomplete message received, expected at least %d bytes "
324-
"(%d received)" % (min_size, len(read_min))
326+
"%s received, expected at least %d bytes "
327+
"(%d received)" % (msg_start, min_size, len(read_min))
325328
)
326329
if read_min:
327330
if isinstance(self.client.framer, ModbusSocketFramer):
@@ -356,9 +359,14 @@ def _recv(self, expected_response_length, full):
356359
result = read_min + result
357360
actual = len(result)
358361
if total is not None and actual != total:
359-
_logger.debug("Incomplete message received, "
362+
msg_start = "Incomplete message" if actual else "No response"
363+
_logger.debug("{} received, "
360364
"Expected {} bytes Recieved "
361-
"{} bytes !!!!".format(total, actual))
365+
"{} bytes !!!!".format(msg_start, total, actual))
366+
elif actual == 0:
367+
# If actual == 0 and total is not None then the above
368+
# should be triggered, so total must be None here
369+
_logger.debug("No response received to unbounded read !!!!")
362370
if self.client.state != ModbusTransactionState.PROCESSING_REPLY:
363371
_logger.debug("Changing transaction state from "
364372
"'WAITING FOR REPLY' to 'PROCESSING REPLY'")

0 commit comments

Comments
 (0)