diff --git a/pymodbus/framer/ascii_framer.py b/pymodbus/framer/ascii_framer.py index 593cb5799..9d68ea46a 100644 --- a/pymodbus/framer/ascii_framer.py +++ b/pymodbus/framer/ascii_framer.py @@ -42,12 +42,6 @@ def __init__(self, decoder, client=None): self._start = b":" self._end = b"\r\n" - # ----------------------------------------------------------------------- # - # Private Helper Functions - # ----------------------------------------------------------------------- # - def _process(self, callback, error=False): - """Process incoming packets irrespective error condition.""" - def decode_data(self, data): """Decode data.""" if len(data) > 1: @@ -56,65 +50,27 @@ def decode_data(self, data): return {"slave": uid, "fcode": fcode} return {} - def checkFrame(self): - """Check and decode the next frame. - - :returns: True if we successful, False otherwise - """ - start = self._buffer.find(self._start) - if start == -1: - return False - if start > 0: # go ahead and skip old bad data - self._buffer = self._buffer[start:] - start = 0 - - if (end := self._buffer.find(self._end)) != -1: - self._header["len"] = end - self._header["uid"] = int(self._buffer[1:3], 16) - self._header["lrc"] = int(self._buffer[end - 2 : end], 16) - data = a2b_hex(self._buffer[start + 1 : end - 2]) - return MessageAscii.check_LRC(data, self._header["lrc"]) - return False - - def advanceFrame(self): - """Skip over the current framed message. - - This allows us to skip over the current message after we have processed - it or determined that it contains an error. It also has to reset the - current frame header handle - """ - self._buffer = self._buffer[self._header["len"] + 2 :] - self._header = {"lrc": "0000", "len": 0, "uid": 0x00} - - def isFrameReady(self): - """Check if we should continue decode logic. - - This is meant to be used in a while loop in the decoding phase to let - the decoder know that there is still data in the buffer. - - :returns: True if ready, False otherwise - """ - return len(self._buffer) > 1 - - def getFrame(self): - """Get the next frame from the buffer. - - :returns: The frame data or "" - """ - start = self._hsize + 1 - end = self._header["len"] - 2 - buffer = self._buffer[start:end] - if end > 0: - return a2b_hex(buffer) - return b"" - - # ----------------------------------------------------------------------- # - # Public Member Functions - # ----------------------------------------------------------------------- # def frameProcessIncomingPacket(self, single, callback, slave, _tid=None, **kwargs): """Process new packet pattern.""" - while self.isFrameReady(): - if not self.checkFrame(): + def check_frame(self): + """Check and decode the next frame.""" + start = self._buffer.find(self._start) + if start == -1: + return False + if start > 0: # go ahead and skip old bad data + self._buffer = self._buffer[start:] + start = 0 + + if (end := self._buffer.find(self._end)) != -1: + self._header["len"] = end + self._header["uid"] = int(self._buffer[1:3], 16) + self._header["lrc"] = int(self._buffer[end - 2 : end], 16) + data = a2b_hex(self._buffer[start + 1 : end - 2]) + return MessageAscii.check_LRC(data, self._header["lrc"]) + return False + + while len(self._buffer) > 1: + if not check_frame(self): break if not self._validate_slave_id(slave, single): header_txt = self._header["uid"] @@ -122,11 +78,18 @@ def frameProcessIncomingPacket(self, single, callback, slave, _tid=None, **kwarg self.resetFrame() continue - frame = self.getFrame() + start = self._hsize + 1 + end = self._header["len"] - 2 + buffer = self._buffer[start:end] + if end > 0: + frame = a2b_hex(buffer) + else: + frame = b"" if (result := self.decoder.decode(frame)) is None: raise ModbusIOException("Unable to decode response") self.populateResult(result) - self.advanceFrame() + self._buffer = self._buffer[self._header["len"] + 2 :] + self._header = {"lrc": "0000", "len": 0, "uid": 0x00} callback(result) # defer this def buildPacket(self, message): diff --git a/pymodbus/framer/binary_framer.py b/pymodbus/framer/binary_framer.py index 561d9589b..07808a4de 100644 --- a/pymodbus/framer/binary_framer.py +++ b/pymodbus/framer/binary_framer.py @@ -53,12 +53,6 @@ def __init__(self, decoder, client=None): self._end = b"\x7d" # } self._repeat = [b"}"[0], b"{"[0]] # python3 hack - # ----------------------------------------------------------------------- # - # Private Helper Functions - # ----------------------------------------------------------------------- # - def _process(self, callback, error=False): - """Process incoming packets irrespective error condition.""" - def decode_data(self, data): """Decode data.""" if len(data) > self._hsize: @@ -67,76 +61,45 @@ def decode_data(self, data): return {"slave": uid, "fcode": fcode} return {} - def checkFrame(self) -> bool: - """Check and decode the next frame. - - :returns: True if we are successful, False otherwise - """ - start = self._buffer.find(self._start) - if start == -1: - return False - if start > 0: # go ahead and skip old bad data - self._buffer = self._buffer[start:] - - if (end := self._buffer.find(self._end)) != -1: - self._header["len"] = end - self._header["uid"] = struct.unpack(">B", self._buffer[1:2])[0] - self._header["crc"] = struct.unpack(">H", self._buffer[end - 2 : end])[0] - data = self._buffer[1 : end - 2] - return checkCRC(data, self._header["crc"]) - return False - - def advanceFrame(self) -> None: - """Skip over the current framed message. - - This allows us to skip over the current message after we have processed - it or determined that it contains an error. It also has to reset the - current frame header handle - """ - self._buffer = self._buffer[self._header["len"] + 2 :] - self._header = {"crc": 0x0000, "len": 0, "uid": 0x00} - - def isFrameReady(self) -> bool: - """Check if we should continue decode logic. - - This is meant to be used in a while loop in the decoding phase to let - the decoder know that there is still data in the buffer. - - :returns: True if ready, False otherwise - """ - return len(self._buffer) > 1 - - def getFrame(self): - """Get the next frame from the buffer. - - :returns: The frame data or "" - """ - start = self._hsize + 1 - end = self._header["len"] - 2 - buffer = self._buffer[start:end] - if end > 0: - return buffer - return b"" - - # ----------------------------------------------------------------------- # - # Public Member Functions - # ----------------------------------------------------------------------- # def frameProcessIncomingPacket(self, single, callback, slave, _tid=None, **kwargs): """Process new packet pattern.""" - while self.isFrameReady(): - if not self.checkFrame(): + def check_frame(self) -> bool: + """Check and decode the next frame.""" + start = self._buffer.find(self._start) + if start == -1: + return False + if start > 0: # go ahead and skip old bad data + self._buffer = self._buffer[start:] + + if (end := self._buffer.find(self._end)) != -1: + self._header["len"] = end + self._header["uid"] = struct.unpack(">B", self._buffer[1:2])[0] + self._header["crc"] = struct.unpack(">H", self._buffer[end - 2 : end])[0] + data = self._buffer[1 : end - 2] + return checkCRC(data, self._header["crc"]) + return False + + while len(self._buffer) > 1: + if not check_frame(self): Log.debug("Frame check failed, ignoring!!") - self.resetFrame() break if not self._validate_slave_id(slave, single): header_txt = self._header["uid"] Log.debug("Not a valid slave id - {}, ignoring!!", header_txt) self.resetFrame() break - if (result := self.decoder.decode(self.getFrame())) is None: + start = self._hsize + 1 + end = self._header["len"] - 2 + buffer = self._buffer[start:end] + if end > 0: + frame = buffer + else: + frame = b"" + if (result := self.decoder.decode(frame)) is None: raise ModbusIOException("Unable to decode response") self.populateResult(result) - self.advanceFrame() + self._buffer = self._buffer[self._header["len"] + 2 :] + self._header = {"crc": 0x0000, "len": 0, "uid": 0x00} callback(result) # defer or push to a thread? def buildPacket(self, message): diff --git a/pymodbus/framer/rtu_framer.py b/pymodbus/framer/rtu_framer.py index 192a8a764..65c36a661 100644 --- a/pymodbus/framer/rtu_framer.py +++ b/pymodbus/framer/rtu_framer.py @@ -4,7 +4,6 @@ import time from pymodbus.exceptions import ( - InvalidMessageReceivedException, ModbusIOException, ) from pymodbus.framer.base import BYTE_ORDER, FRAME_HEADER, ModbusFramer @@ -64,9 +63,6 @@ def __init__(self, decoder, client=None): self._min_frame_size = 4 self.function_codes = decoder.lookup.keys() if decoder else {} - # ----------------------------------------------------------------------- # - # Private Helper Functions - # ----------------------------------------------------------------------- # def decode_data(self, data): """Decode data.""" if len(data) > self._hsize: @@ -75,157 +71,99 @@ def decode_data(self, data): return {"slave": uid, "fcode": fcode} return {} - def checkFrame(self): - """Check if the next frame is available. - Return True if we were successful. + def frameProcessIncomingPacket(self, _single, callback, slave, _tid=None, **kwargs): # noqa: C901 + """Process new packet pattern.""" - 1. Populate header - 2. Discard frame if UID does not match - """ - try: - self.populateHeader() - frame_size = self._header["len"] - data = self._buffer[: frame_size - 2] - crc = self._header["crc"] - crc_val = (int(crc[0]) << 8) + int(crc[1]) - return checkCRC(data, crc_val) - except (IndexError, KeyError, struct.error): + def is_frame_ready(self): + """Check if we should continue decode logic.""" + size = self._header.get("len", 0) + if not size and len(self._buffer) > self._hsize: + try: + self._header["uid"] = int(self._buffer[0]) + self._header["tid"] = int(self._buffer[0]) + func_code = int(self._buffer[1]) + pdu_class = self.decoder.lookupPduClass(func_code) + size = pdu_class.calculateRtuFrameSize(self._buffer) + self._header["len"] = size + + if len(self._buffer) < size: + raise IndexError + self._header["crc"] = self._buffer[size - 2 : size] + except IndexError: + return False + return len(self._buffer) >= size if size > 0 else False + + def get_frame_start(self, slaves, broadcast, skip_cur_frame): + """Scan buffer for a relevant frame start.""" + start = 1 if skip_cur_frame else 0 + if (buf_len := len(self._buffer)) < 4: + return False + for i in range(start, buf_len - 3): # + if not broadcast and self._buffer[i] not in slaves: + continue + if ( + self._buffer[i + 1] not in self.function_codes + and (self._buffer[i + 1] - 0x80) not in self.function_codes + ): + continue + if i: + self._buffer = self._buffer[i:] # remove preceding trash. + return True + if buf_len > 3: + self._buffer = self._buffer[-3:] return False - def advanceFrame(self): - """Skip over the current framed message. - - This allows us to skip over the current message after we have processed - it or determined that it contains an error. It also has to reset the - current frame header handle - """ - self._buffer = self._buffer[self._header["len"] :] - Log.debug("Frame advanced, resetting header!!") - self._header = {"uid": 0x00, "len": 0, "crc": b"\x00\x00"} - - def resetFrame(self): - """Reset the entire message frame. - - This allows us to skip over errors that may be in the stream. - It is hard to know if we are simply out of sync or if there is - an error in the stream as we have no way to check the start or - end of the message (python just doesn't have the resolution to - check for millisecond delays). - """ - x = self._buffer - super().resetFrame() - self._buffer = x - - def isFrameReady(self): - """Check if we should continue decode logic. - - This is meant to be used in a while loop in the decoding phase to let - the decoder know that there is still data in the buffer. - - :returns: True if ready, False otherwise - """ - size = self._header.get("len", 0) - if not size and len(self._buffer) > self._hsize: + def check_frame(self): + """Check if the next frame is available.""" try: - # Frame is ready only if populateHeader() successfully - # populates crc field which finishes RTU frame otherwise, - # if buffer is not yet long enough, populateHeader() raises IndexError - size = self.populateHeader() - except IndexError: + self._header["uid"] = int(self._buffer[0]) + self._header["tid"] = int(self._buffer[0]) + func_code = int(self._buffer[1]) + pdu_class = self.decoder.lookupPduClass(func_code) + size = pdu_class.calculateRtuFrameSize(self._buffer) + self._header["len"] = size + + if len(self._buffer) < size: + raise IndexError + self._header["crc"] = self._buffer[size - 2 : size] + frame_size = self._header["len"] + data = self._buffer[: frame_size - 2] + crc = self._header["crc"] + crc_val = (int(crc[0]) << 8) + int(crc[1]) + return checkCRC(data, crc_val) + except (IndexError, KeyError, struct.error): return False - return len(self._buffer) >= size if size > 0 else False - - def populateHeader(self, data=None): - """Try to set the headers `uid`, `len` and `crc`. - - This method examines `self._buffer` and writes meta - information into `self._header`. - - Beware that this method will raise an IndexError if - `self._buffer` is not yet long enough. - """ - data = data if data is not None else self._buffer - self._header["uid"] = int(data[0]) - self._header["tid"] = int(data[0]) - size = self.get_expected_response_length(data) - self._header["len"] = size - - if len(data) < size: - # crc yet not available - raise IndexError - self._header["crc"] = data[size - 2 : size] - return size - - def getFrame(self): - """Get the next frame from the buffer. - - :returns: The frame data or "" - """ - start = self._hsize - end = self._header["len"] - 2 - buffer = self._buffer[start:end] - if end > 0: - Log.debug("Getting Frame - {}", buffer, ":hex") - return buffer - return b"" - - def populateResult(self, result): - """Populate the modbus result header. - - The serial packets do not have any header information - that is copied. - - :param result: The response packet - """ - result.slave_id = self._header["uid"] - result.transaction_id = self._header["tid"] - - def getFrameStart(self, slaves, broadcast, skip_cur_frame): - """Scan buffer for a relevant frame start.""" - start = 1 if skip_cur_frame else 0 - if (buf_len := len(self._buffer)) < 4: - return False - for i in range(start, buf_len - 3): # - if not broadcast and self._buffer[i] not in slaves: - continue - if ( - self._buffer[i + 1] not in self.function_codes - and (self._buffer[i + 1] - 0x80) not in self.function_codes - ): - continue - if i: - self._buffer = self._buffer[i:] # remove preceding trash. - return True - if buf_len > 3: - self._buffer = self._buffer[-3:] - return False - - # ----------------------------------------------------------------------- # - # Public Member Functions - # ----------------------------------------------------------------------- # - def frameProcessIncomingPacket(self, _single, callback, slave, _tid=None, **kwargs): - """Process new packet pattern.""" broadcast = not slave[0] skip_cur_frame = False - while self.getFrameStart(slave, broadcast, skip_cur_frame): - if not self.isFrameReady(): + while get_frame_start(self, slave, broadcast, skip_cur_frame): + self._header = {"uid": 0x00, "len": 0, "crc": b"\x00\x00"} + if not is_frame_ready(self): Log.debug("Frame - not ready") break - if not self.checkFrame(): + if not check_frame(self): Log.debug("Frame check failed, ignoring!!") + x = self._buffer self.resetFrame() + self._buffer = x skip_cur_frame = True continue - # handled in getFrameStart - # if not self._validate_slave_id(slave, single): - # header_txt = self._header["uid"] - # Log.debug("Not a valid slave id - {}, ignoring!!", header_txt) - # self.resetFrame() - # skip_cur_frame = True - # continue - self._process(callback) + start = self._hsize + end = self._header["len"] - 2 + buffer = self._buffer[start:end] + if end > 0: + Log.debug("Getting Frame - {}", buffer, ":hex") + data = buffer + else: + data = b"" + if (result := self.decoder.decode(data)) is None: + raise ModbusIOException("Unable to decode request") + result.slave_id = self._header["uid"] + result.transaction_id = self._header["tid"] + self._buffer = self._buffer[self._header["len"] :] + Log.debug("Frame advanced, resetting header!!") + callback(result) # defer or push to a thread? def buildPacket(self, message): """Create a ready to send modbus packet. @@ -299,28 +237,3 @@ def recvPacket(self, size): result = self.client.recv(size) self.client.last_frame_end = round(time.time(), 6) return result - - def _process(self, callback, error=False): - """Process incoming packets irrespective error condition.""" - data = self._buffer if error else self.getFrame() - if (result := self.decoder.decode(data)) is None: - raise ModbusIOException("Unable to decode request") - if error and result.function_code < 0x80: - raise InvalidMessageReceivedException(str(result)) - self.populateResult(result) - self.advanceFrame() - callback(result) # defer or push to a thread? - - def get_expected_response_length(self, data): - """Get the expected response length. - - :param data: Message data read so far - :raises IndexError: If not enough data to read byte count - :return: Total frame size - """ - func_code = int(data[1]) - pdu_class = self.decoder.lookupPduClass(func_code) - return pdu_class.calculateRtuFrameSize(data) - - -# __END__ diff --git a/pymodbus/framer/socket_framer.py b/pymodbus/framer/socket_framer.py index f818911c4..dca7b3b94 100644 --- a/pymodbus/framer/socket_framer.py +++ b/pymodbus/framer/socket_framer.py @@ -3,7 +3,6 @@ import struct from pymodbus.exceptions import ( - InvalidMessageReceivedException, ModbusIOException, ) from pymodbus.framer.base import SOCKET_FRAME_HEADER, ModbusFramer @@ -44,65 +43,6 @@ def __init__(self, decoder, client=None): super().__init__(decoder, client) self._hsize = 0x07 - # ----------------------------------------------------------------------- # - # Private Helper Functions - # ----------------------------------------------------------------------- # - def checkFrame(self): - """Check and decode the next frame. - - Return true if we were successful. - """ - if not self.isFrameReady(): - return False - ( - self._header["tid"], - self._header["pid"], - self._header["len"], - self._header["uid"], - ) = struct.unpack(">HHHB", self._buffer[0 : self._hsize]) - - # someone sent us an error? ignore it - if self._header["len"] < 2: - self.advanceFrame() - # we have at least a complete message, continue - elif len(self._buffer) - self._hsize + 1 >= self._header["len"]: - return True - # we don't have enough of a message yet, wait - Log.debug("Frame check failed, missing part of message!!") - return False - - def advanceFrame(self): - """Skip over the current framed message. - - This allows us to skip over the current message after we have processed - it or determined that it contains an error. It also has to reset the - current frame header handle - """ - length = self._hsize + self._header["len"] -1 - self._buffer = self._buffer[length:] - self._header = {"tid": 0, "pid": 0, "len": 0, "uid": 0} - - def isFrameReady(self): - """Check if we should continue decode logic. - - This is meant to be used in a while loop in the decoding phase to let - the decoder factory know that there is still data in the buffer. - - :returns: True if ready, False otherwise - """ - return len(self._buffer) > self._hsize - - def getFrame(self): - """Return the next frame from the buffered data. - - :returns: The next full frame buffer - """ - length = self._hsize + self._header["len"] -1 - return self._buffer[self._hsize : length] - - # ----------------------------------------------------------------------- # - # Public Member Functions - # ----------------------------------------------------------------------- # def decode_data(self, data): """Decode data.""" if len(data) > self._hsize: @@ -130,30 +70,46 @@ def frameProcessIncomingPacket(self, single, callback, slave, tid=None, **kwargs The processed and decoded messages are pushed to the callback function to process and send. """ + def check_frame(self): + """Check and decode the next frame.""" + if not len(self._buffer) > self._hsize: + return False + ( + self._header["tid"], + self._header["pid"], + self._header["len"], + self._header["uid"], + ) = struct.unpack(">HHHB", self._buffer[0 : self._hsize]) + if self._header["len"] < 2: + length = self._hsize + self._header["len"] -1 + self._buffer = self._buffer[length:] + self._header = {"tid": 0, "pid": 0, "len": 0, "uid": 0} + elif len(self._buffer) - self._hsize + 1 >= self._header["len"]: + return True + Log.debug("Frame check failed, missing part of message!!") + return False + while True: - if not self.checkFrame(): + if not check_frame(self): return if not self._validate_slave_id(slave, single): header_txt = self._header["uid"] Log.debug("Not a valid slave id - {}, ignoring!!", header_txt) self.resetFrame() return - self._process(callback, tid) - - def _process(self, callback, tid, error=False): - """Process incoming packets irrespective error condition.""" - data = self._buffer if error else self.getFrame() - if (result := self.decoder.decode(data)) is None: - self.resetFrame() - raise ModbusIOException("Unable to decode request") - if error and result.function_code < 0x80: - raise InvalidMessageReceivedException(str(result)) - self.populateResult(result) - self.advanceFrame() - if tid and tid != result.transaction_id: - self.resetFrame() - else: - callback(result) # defer or push to a thread? + length = self._hsize + self._header["len"] -1 + data = self._buffer[self._hsize : length] + if (result := self.decoder.decode(data)) is None: + self.resetFrame() + raise ModbusIOException("Unable to decode request") + self.populateResult(result) + length = self._hsize + self._header["len"] -1 + self._buffer = self._buffer[length:] + self._header = {"tid": 0, "pid": 0, "len": 0, "uid": 0} + if tid and tid != result.transaction_id: + self.resetFrame() + else: + callback(result) # defer or push to a thread? def buildPacket(self, message): """Create a ready to send modbus packet. @@ -171,6 +127,3 @@ def buildPacket(self, message): ) packet += data return packet - - -# __END__ diff --git a/pymodbus/framer/tls_framer.py b/pymodbus/framer/tls_framer.py index 6b9a1d4c4..b3430864b 100644 --- a/pymodbus/framer/tls_framer.py +++ b/pymodbus/framer/tls_framer.py @@ -3,7 +3,6 @@ import struct from pymodbus.exceptions import ( - InvalidMessageReceivedException, ModbusIOException, ) from pymodbus.framer.base import TLS_FRAME_HEADER, ModbusFramer @@ -36,51 +35,6 @@ def __init__(self, decoder, client=None): super().__init__(decoder, client) self._hsize = 0x0 - # ----------------------------------------------------------------------- # - # Private Helper Functions - # ----------------------------------------------------------------------- # - def checkFrame(self): - """Check and decode the next frame. - - Return true if we were successful. - """ - if self.isFrameReady(): - # we have at least a complete message, continue - if len(self._buffer) - self._hsize >= 1: - return True - # we don't have enough of a message yet, wait - return False - - def advanceFrame(self): - """Skip over the current framed message. - - This allows us to skip over the current message after we have processed - it or determined that it contains an error. It also has to reset the - current frame header handle - """ - self._buffer = b"" - self._header = {} - - def isFrameReady(self): - """Check if we should continue decode logic. - - This is meant to be used in a while loop in the decoding phase to let - the decoder factory know that there is still data in the buffer. - - :returns: True if ready, False otherwise - """ - return len(self._buffer) > self._hsize - - def getFrame(self): - """Return the next frame from the buffered data. - - :returns: The next full frame buffer - """ - return self._buffer[self._hsize :] - - # ----------------------------------------------------------------------- # - # Public Member Functions - # ----------------------------------------------------------------------- # def decode_data(self, data): """Decode data.""" if len(data) > self._hsize: @@ -91,26 +45,31 @@ def decode_data(self, data): def frameProcessIncomingPacket(self, single, callback, slave, _tid=None, **kwargs): """Process new packet pattern.""" # no slave id for Modbus Security Application Protocol - if not self.isFrameReady(): + def check_frame(self): + """Check and decode the next frame.""" + if len(self._buffer) > self._hsize: + # we have at least a complete message, continue + if len(self._buffer) - self._hsize >= 1: + return True + # we don't have enough of a message yet, wait + return False + + if not len(self._buffer) > self._hsize: return - if not self.checkFrame(): + if not check_frame(self): Log.debug("Frame check failed, ignoring!!") self.resetFrame() return if not self._validate_slave_id(slave, single): Log.debug("Not in valid slave id - {}, ignoring!!", slave) self.resetFrame() - self._process(callback) - - def _process(self, callback, error=False): - """Process incoming packets irrespective error condition.""" - data = self._buffer if error else self.getFrame() + return + data = self._buffer[self._hsize :] if (result := self.decoder.decode(data)) is None: raise ModbusIOException("Unable to decode request") - if error and result.function_code < 0x80: - raise InvalidMessageReceivedException(str(result)) self.populateResult(result) - self.advanceFrame() + self._buffer = b"" + self._header = {} callback(result) # defer or push to a thread? def buildPacket(self, message): diff --git a/test/message/to_do_framers_py b/test/message/to_do_framers_py new file mode 100644 index 000000000..0eafac228 --- /dev/null +++ b/test/message/to_do_framers_py @@ -0,0 +1,521 @@ +"""Test framers.""" +from unittest import mock + +import pytest + +from pymodbus import Framer +from pymodbus.bit_read_message import ReadCoilsRequest +from pymodbus.client.base import ModbusBaseClient +from pymodbus.exceptions import ModbusIOException +from pymodbus.factory import ClientDecoder +from pymodbus.framer import ( + ModbusAsciiFramer, + ModbusBinaryFramer, + ModbusRtuFramer, + ModbusSocketFramer, +) +from pymodbus.transport import CommType +from pymodbus.utilities import ModbusTransactionState + + +BASE_PORT = 6600 + + +TEST_MESSAGE = b"\x00\x01\x00\x01\x00\n\xec\x1c" + + +class TestFramers: + """Test framers.""" + + slaves = [2, 17] + + @staticmethod + @pytest.fixture(name="rtu_framer") + def fixture_rtu_framer(): + """RTU framer.""" + return ModbusRtuFramer(ClientDecoder()) + + @staticmethod + @pytest.fixture(name="ascii_framer") + def fixture_ascii_framer(): + """Ascii framer.""" + return ModbusAsciiFramer(ClientDecoder()) + + + @pytest.mark.parametrize( + "framer", + [ + ModbusRtuFramer, + ModbusAsciiFramer, + ModbusBinaryFramer, + ], +) + def test_framer_initialization(self, framer): + """Test framer initialization.""" + decoder = ClientDecoder() + framer = framer(decoder) + assert framer.client is None + assert framer._buffer == b"" # pylint: disable=protected-access + assert framer.decoder == decoder + if isinstance(framer, ModbusAsciiFramer): + assert framer._header == { # pylint: disable=protected-access + "tid": 0, + "pid": 0, + "lrc": "0000", + "len": 0, + "uid": 0x00, + "crc": b"\x00\x00", + } + assert framer._hsize == 0x02 # pylint: disable=protected-access + assert framer._start == b":" # pylint: disable=protected-access + assert framer._end == b"\r\n" # pylint: disable=protected-access + elif isinstance(framer, ModbusRtuFramer): + assert framer._header == { # pylint: disable=protected-access + "tid": 0, + "pid": 0, + "lrc": "0000", + "uid": 0x00, + "len": 0, + "crc": b"\x00\x00", + } + assert framer._hsize == 0x01 # pylint: disable=protected-access + assert framer._end == b"\x0d\x0a" # pylint: disable=protected-access + assert framer._min_frame_size == 4 # pylint: disable=protected-access + else: + assert framer._header == { # pylint: disable=protected-access + "tid": 0, + "pid": 0, + "lrc": "0000", + "crc": b"\x00\x00", + "len": 0, + "uid": 0x00, + } + assert framer._hsize == 0x01 # pylint: disable=protected-access + assert framer._start == b"\x7b" # pylint: disable=protected-access + assert framer._end == b"\x7d" # pylint: disable=protected-access + assert framer._repeat == [ # pylint: disable=protected-access + b"}"[0], + b"{"[0], + ] + + + @pytest.mark.parametrize( + ("data", "expected"), + [ + (b"", 0), + (b"\x02\x01\x01\x00Q\xcc", 1), + (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", 1), # valid frame + (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC", 0), # invalid frame CRC + ], + ) + def test_check_frame(self, rtu_framer, data, expected): + """Test check frame.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + rtu_framer.processIncomingPacket(data, callback, self.slaves) + assert count == expected + + + @pytest.mark.parametrize( + ("data", "header", "res"), + [ + (b"", {"uid": 0x00, "len": 0, "crc": b"\x00\x00"}, 0), + (b"abcd", {"uid": 0x00, "len": 2, "crc": b"\x00\x00"}, 0), + ( + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x12\x03", # real case, frame size is 11 + {"uid": 0x00, "len": 11, "crc": b"\x00\x00"}, + 1, + ), + ], + ) + def test_rtu_advance_framer(self, rtu_framer, data, header, res): + """Test rtu advance framer.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + rtu_framer._header = header # pylint: disable=protected-access + rtu_framer.processIncomingPacket(data, callback, self.slaves) + assert count == res + + + @pytest.mark.parametrize("data", [b"", b"abcd"]) + def test_rtu_reset_framer(self, rtu_framer, data): + """Test rtu reset framer.""" + rtu_framer._buffer = data # pylint: disable=protected-access + rtu_framer.resetFrame() + assert rtu_framer._header == { # pylint: disable=protected-access + "lrc": "0000", + "crc": b"\x00\x00", + "len": 0, + "uid": 0x00, + "pid": 0, + "tid": 0, + } + + + @pytest.mark.parametrize( + ("data", "expected"), + [ + (b"", 0), + (b"\x11", 0), + (b"\x11\x03", 0), + (b"\x11\x03\x06", 0), + (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49", 0), + (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", 1), + (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\xAB\xCD", 1), + ], + ) + def test_is_frame_ready(self, rtu_framer, data, expected): + """Test is frame ready.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + rtu_framer.processIncomingPacket(data, callback, self.slaves) + assert count == expected + + + @pytest.mark.parametrize( + "data", + [ + b"", + b"\x11", + b"\x11\x03", + b"\x11\x03\x06", + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x43", + ], + ) + def test_rtu_populate_header_fail(self, rtu_framer, data): + """Test rtu populate header fail.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + rtu_framer.processIncomingPacket(data, callback, self.slaves) + assert not count + + + @pytest.mark.parametrize( + ("data", "header"), + [ + ( + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", + { + "crc": b"\x49\xAD", + "uid": 17, + "len": 11, + "tid": 17, + }, + ), + ( + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03", + { + "crc": b"\x49\xAD", + "uid": 17, + "len": 11, + "tid": 17, + }, + ), + ], + ) + def test_rtu_populate_header(self, rtu_framer, data, header): + """Test rtu populate header.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + rtu_framer.processIncomingPacket(data, callback, self.slaves) + assert rtu_framer._header == header # pylint: disable=protected-access + + + def test_get_frame(self, rtu_framer): + """Test get frame.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + data = b"\x02\x01\x01\x00Q\xcc" + rtu_framer.processIncomingPacket(data, callback, self.slaves) + assert count + assert result.function_code.to_bytes(1,'big') + result.encode() == b"\x01\x01\x00" + + + def test_populate_result(self, rtu_framer): + """Test populate result.""" + rtu_framer._header["uid"] = 255 # pylint: disable=protected-access + result = mock.Mock() + rtu_framer.populateResult(result) + assert result.slave_id == 255 + + + @pytest.mark.parametrize( + ("data", "slaves", "reset_called", "cb_called"), + [ + (b"\x11", [17], 0, 0), # not complete frame + (b"\x11\x03", [17], 0, 0), # not complete frame + (b"\x11\x03\x06", [17], 0, 0), # not complete frame + (b"\x11\x03\x06\xAE\x41\x56\x52\x43", [17], 0, 0), # not complete frame + ( + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40", + [17], + 0, + 0, + ), # not complete frame + ( + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49", + [17], + 0, + 0, + ), # not complete frame + (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC", [17], 1, 0), # bad crc + ( + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", + [17], + 0, + 1, + ), # good frame + ( + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", + [16], + 0, + 0, + ), # incorrect slave id + (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03", [17], 0, 1), + # good frame + part of next frame + ], + ) + def test_rtu_incoming_packet(self, rtu_framer, data, slaves, reset_called, cb_called): + """Test rtu process incoming packet.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + with mock.patch.object( + rtu_framer, "resetFrame", wraps=rtu_framer.resetFrame + ) as mock_reset: + rtu_framer.processIncomingPacket(data, callback, slaves) + assert count == cb_called + assert mock_reset.call_count == reset_called + + + async def test_send_packet(self, rtu_framer): + """Test send packet.""" + message = TEST_MESSAGE + client = ModbusBaseClient( + Framer.ASCII, + host="localhost", + port=BASE_PORT + 1, + CommType=CommType.TCP, + ) + client.state = ModbusTransactionState.TRANSACTION_COMPLETE + client.silent_interval = 1 + client.last_frame_end = 1 + client.comm_params.timeout_connect = 0.25 + client.idle_time = mock.Mock(return_value=1) + client.send = mock.Mock(return_value=len(message)) + rtu_framer.client = client + assert rtu_framer.sendPacket(message) == len(message) + client.state = ModbusTransactionState.PROCESSING_REPLY + assert rtu_framer.sendPacket(message) == len(message) + + + def test_recv_packet(self, rtu_framer): + """Test receive packet.""" + message = TEST_MESSAGE + client = mock.Mock() + client.recv.return_value = message + rtu_framer.client = client + assert rtu_framer.recvPacket(len(message)) == message + + def test_process(self, rtu_framer): + """Test process.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + data = TEST_MESSAGE + rtu_framer.processIncomingPacket(data, callback, self.slaves) + assert not count + + @pytest.mark.parametrize(("slaves", "res"), [([16], 0), ([17], 1)]) + def test_validate__slave_id(self,rtu_framer, slaves, res): + """Test validate slave.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + data = b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x12\x03" + rtu_framer.processIncomingPacket(data, callback, slaves) + assert count == res + + @pytest.mark.parametrize("data", [b":010100010001FC\r\n", b""]) + def test_decode_ascii_data(self, ascii_framer, data): + """Test decode ascii.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + ascii_framer.processIncomingPacket(data, callback, [1]) + if result: + assert result.slave_id == 1 + assert result.function_code == 1 + else: + assert not result + + def test_recv_split_packet(self): + """Test receive packet.""" + response_ok = False + + def _handle_response(_reply): + """Handle response.""" + nonlocal response_ok + response_ok = True + + message = bytearray(b"\x00\x01\x00\x00\x00\x0b\x01\x03\x08\x00\xb5\x12\x2f\x37\x21\x00\x03") + for i in range(0, len(message)): + part1 = message[:i] + part2 = message[i:] + response_ok = False + framer = ModbusSocketFramer(ClientDecoder()) + if i: + framer.processIncomingPacket(part1, _handle_response, slave=0) + assert not response_ok, "Response should not be accepted" + framer.processIncomingPacket(part2, _handle_response, slave=0) + assert response_ok, "Response is valid, but not accepted" + + + def test_recv_socket_exception_packet(self): + """Test receive packet.""" + response_ok = False + + def _handle_response(_reply): + """Handle response.""" + nonlocal response_ok + response_ok = True + + message = bytearray(b"\x00\x02\x00\x00\x00\x03\x01\x84\x02") + response_ok = False + framer = ModbusSocketFramer(ClientDecoder()) + framer.processIncomingPacket(message, _handle_response, slave=0) + assert response_ok, "Response is valid, but not accepted" + + message = bytearray(b"\x00\x01\x00\x00\x00\x0b\x01\x03\x08\x00\xb5\x12\x2f\x37\x21\x00\x03") + response_ok = False + framer = ModbusSocketFramer(ClientDecoder()) + framer.processIncomingPacket(message, _handle_response, slave=0) + assert response_ok, "Response is valid, but not accepted" + + # ---- 100% coverage + @pytest.mark.parametrize( + ("framer", "message"), + [ + (ModbusAsciiFramer, b':00010001000AF4\r\n',), + (ModbusBinaryFramer, b'{\x00\x01\x00\x01\x00\n\xec\x1c}',), + (ModbusRtuFramer, b"\x00\x01\x00\x01\x00\n\xec\x1c",), + (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x00\x01\x00\x01\x00\n',), + ] + ) + def test_build_packet(self, framer, message): + """Test build packet.""" + test_framer = framer(ClientDecoder()) + request = ReadCoilsRequest(1, 10) + assert test_framer.buildPacket(request) == message + + + @pytest.mark.parametrize( + ("framer", "message"), + [ + (ModbusAsciiFramer, b':01010001000AF3\r\n',), + (ModbusBinaryFramer, b'A{\x01\x01\x00\x01\x00\n\xed\xcd}',), + (ModbusRtuFramer, b"\x01\x01\x03\x01\x00\n\xed\x89",), + (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x01\x01\x00\x01\x00\n',), + ] + ) + @pytest.mark.parametrize(("slave"), [0x01, 0x02]) + def test_processincomingpacket_ok(self, framer, message, slave): + """Test processIncomingPacket.""" + test_framer = framer(ClientDecoder()) + test_framer.processIncomingPacket(message, mock.Mock(), slave) + + + @pytest.mark.parametrize( + ("framer", "message"), + [ + (ModbusAsciiFramer, b':01270001000ACD\r\n',), + (ModbusBinaryFramer, b'{\x01\x1a\x00\x01\x00\n\x89\xcf}',), + (ModbusRtuFramer, b"\x01\x03\x03\x01\x00\n\x94\x49",), + (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x01\x27\x00\x01\x00\n',), + ] + ) + def test_processincomingpacket_not_ok(self, framer, message): + """Test processIncomingPacket.""" + test_framer = framer(ClientDecoder()) + with pytest.raises(ModbusIOException): + test_framer.processIncomingPacket(message, mock.Mock(), 0x01) + + @pytest.mark.parametrize( + ("framer", "message"), + [ + (ModbusAsciiFramer, b':61620001000AF4\r\n',), + (ModbusBinaryFramer, b'{\x61\x62\x00\x01\x00\n\xec\x1c}',), + (ModbusRtuFramer, b"\x61\x62\x00\x01\x00\n\xec\x1c",), + (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x61\x62\x00\x01\x00\n',), + ] + ) + @pytest.mark.parametrize("expected", [{"fcode": 98, "slave": 97}]) + def test_decode_data(self, framer, message, expected): + """Test decode data.""" + test_framer = framer(ClientDecoder()) + decoded = test_framer.decode_data(b'') + assert decoded == {} + decoded = test_framer.decode_data(message) + assert decoded["fcode"] == expected["fcode"] + assert decoded["slave"] == expected["slave"] + + def test_binary_framer_preflight(self): + """Test binary framer _preflight.""" + test_framer = ModbusBinaryFramer(ClientDecoder()) + assert test_framer._preflight(b'A{B}C') == b'A{{B}}C' # pylint: disable=protected-access diff --git a/test/message/to_do_server_multidrop_py b/test/message/to_do_server_multidrop_py new file mode 100644 index 000000000..74711ab5c --- /dev/null +++ b/test/message/to_do_server_multidrop_py @@ -0,0 +1,174 @@ +"""Test server working as slave on a multidrop RS485 line.""" +from unittest import mock + +import pytest + +from pymodbus.framer.rtu_framer import ModbusRtuFramer +from pymodbus.server.async_io import ServerDecoder + + +class TestMultidrop: + """Test that server works on a multidrop line.""" + + slaves = [2] + + good_frame = b"\x02\x03\x00\x01\x00}\xd4\x18" + + @pytest.fixture(name="framer") + def fixture_framer(self): + """Prepare framer.""" + return ModbusRtuFramer(ServerDecoder()) + + @pytest.fixture(name="callback") + def fixture_callback(self): + """Prepare dummy callback.""" + return mock.Mock() + + def test_ok_frame(self, framer, callback): + """Test ok frame.""" + serial_event = self.good_frame + framer.processIncomingPacket(serial_event, callback, self.slaves) + callback.assert_called_once() + + def test_ok_2frame(self, framer, callback): + """Test ok frame.""" + serial_event = self.good_frame + self.good_frame + framer.processIncomingPacket(serial_event, callback, self.slaves) + assert callback.call_count == 2 + + def test_bad_crc(self, framer, callback): + """Test bad crc.""" + serial_event = b"\x02\x03\x00\x01\x00}\xd4\x19" # Manually mangled crc + framer.processIncomingPacket(serial_event, callback, self.slaves) + callback.assert_not_called() + + def test_wrong_id(self, framer, callback): + """Test frame wrong id.""" + serial_event = b"\x01\x03\x00\x01\x00}\xd4+" # Frame with good CRC but other id + framer.processIncomingPacket(serial_event, callback, self.slaves) + callback.assert_not_called() + + def test_big_split_response_frame_from_other_id(self, framer, callback): + """Test split response.""" + # This is a single *response* from device id 1 after being queried for 125 holding register values + # Because the response is so long it spans several serial events + serial_events = [ + b"\x01\x03\xfa\xc4y\xc0\x00\xc4y\xc0\x00\xc4y\xc0\x00\xc4y\xc0\x00\xc4y\xc0\x00Dz\x00\x00C\x96\x00\x00", + b"?\x05\x1e\xb8DH\x00\x00D\x96\x00\x00D\xfa\x00\x00DH\x00\x00D\x96\x00\x00D\xfa\x00\x00DH\x00", + b"\x00D\x96\x00\x00D\xfa\x00\x00B\x96\x00\x00B\xb4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00N,", + ] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, self.slaves) + callback.assert_not_called() + + def test_split_frame(self, framer, callback): + """Test split frame.""" + serial_events = [self.good_frame[:5], self.good_frame[5:]] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, self.slaves) + callback.assert_called_once() + + def test_complete_frame_trailing_data_without_id(self, framer, callback): + """Test trailing data.""" + garbage = b"\x05\x04\x03" # without id + serial_event = garbage + self.good_frame + framer.processIncomingPacket(serial_event, callback, self.slaves) + callback.assert_called_once() + + def test_complete_frame_trailing_data_with_id(self, framer, callback): + """Test trailing data.""" + garbage = b"\x05\x04\x03\x02\x01\x00" # with id + serial_event = garbage + self.good_frame + framer.processIncomingPacket(serial_event, callback, self.slaves) + callback.assert_called_once() + + def test_split_frame_trailing_data_with_id(self, framer, callback): + """Test split frame.""" + garbage = b"\x05\x04\x03\x02\x01\x00" + serial_events = [garbage + self.good_frame[:5], self.good_frame[5:]] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, self.slaves) + callback.assert_called_once() + + def test_coincidental_1(self, framer, callback): + """Test conincidental.""" + garbage = b"\x02\x90\x07" + serial_events = [garbage, self.good_frame[:5], self.good_frame[5:]] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, self.slaves) + callback.assert_called_once() + + def test_coincidental_2(self, framer, callback): + """Test conincidental.""" + garbage = b"\x02\x10\x07" + serial_events = [garbage, self.good_frame[:5], self.good_frame[5:]] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, self.slaves) + callback.assert_called_once() + + def test_coincidental_3(self, framer, callback): + """Test conincidental.""" + garbage = b"\x02\x10\x07\x10" + serial_events = [garbage, self.good_frame[:5], self.good_frame[5:]] + for serial_event in serial_events: + framer.processIncomingPacket(serial_event, callback, self.slaves) + callback.assert_called_once() + + def test_wrapped_frame(self, framer, callback): + """Test wrapped frame.""" + garbage = b"\x05\x04\x03\x02\x01\x00" + serial_event = garbage + self.good_frame + garbage + framer.processIncomingPacket(serial_event, callback, self.slaves) + + # We probably should not respond in this case; in this case we've likely become desynchronized + # i.e. this probably represents a case where a command came for us, but we didn't get + # to the serial buffer in time (some other co-routine or perhaps a block on the USB bus) + # and the master moved on and queried another device + callback.assert_called_once() + + def test_frame_with_trailing_data(self, framer, callback): + """Test trailing data.""" + garbage = b"\x05\x04\x03\x02\x01\x00" + serial_event = self.good_frame + garbage + framer.processIncomingPacket(serial_event, callback, self.slaves) + + # We should not respond in this case for identical reasons as test_wrapped_frame + callback.assert_called_once() + + def test_getFrameStart(self, framer): + """Test getFrameStart.""" + result = None + count = 0 + def test_callback(data): + """Check callback.""" + nonlocal result, count + count += 1 + result = data.function_code.to_bytes(1,'big')+data.encode() + + framer_ok = b"\x02\x03\x00\x01\x00\x7d\xd4\x18" + framer.processIncomingPacket(framer_ok, test_callback, self.slaves) + assert framer_ok[1:-2] == result + + count = 0 + framer_2ok = framer_ok + framer_ok + framer.processIncomingPacket(framer_2ok, test_callback, self.slaves) + assert count == 2 + assert not framer._buffer # pylint: disable=protected-access + + framer._buffer = framer_ok[:2] # pylint: disable=protected-access + framer.processIncomingPacket(b'', test_callback, self.slaves) + assert framer_ok[:2] == framer._buffer # pylint: disable=protected-access + + framer._buffer = framer_ok[:3] # pylint: disable=protected-access + framer.processIncomingPacket(b'', test_callback, self.slaves) + assert framer_ok[:3] == framer._buffer # pylint: disable=protected-access + + framer_ok = b"\xF0\x03\x00\x01\x00}\xd4\x18" + framer.processIncomingPacket(framer_ok, test_callback, self.slaves) + assert framer._buffer == framer_ok[-3:] # pylint: disable=protected-access diff --git a/test/message/to_do_transaction_py b/test/message/to_do_transaction_py new file mode 100755 index 000000000..48a182560 --- /dev/null +++ b/test/message/to_do_transaction_py @@ -0,0 +1,782 @@ +"""Test transaction.""" +from binascii import a2b_hex +from itertools import count +from unittest import mock + +import pytest + +from pymodbus.exceptions import ( + InvalidMessageReceivedException, + ModbusIOException, +) +from pymodbus.factory import ServerDecoder +from pymodbus.pdu import ModbusRequest +from pymodbus.transaction import ( + ModbusAsciiFramer, + ModbusBinaryFramer, + ModbusRtuFramer, + ModbusSocketFramer, + ModbusTlsFramer, + ModbusTransactionManager, +) + + +TEST_MESSAGE = b"\x7b\x01\x03\x00\x00\x00\x05\x85\xC9\x7d" + + +class TestTransaction: # pylint: disable=too-many-public-methods + """Unittest for the pymodbus.transaction module.""" + + client = None + decoder = None + _tcp = None + _tls = None + _rtu = None + _ascii = None + _binary = None + _manager = None + _tm = None + + # ----------------------------------------------------------------------- # + # Test Construction + # ----------------------------------------------------------------------- # + def setup_method(self): + """Set up the test environment.""" + self.client = None + self.decoder = ServerDecoder() + self._tcp = ModbusSocketFramer(decoder=self.decoder, client=None) + self._tls = ModbusTlsFramer(decoder=self.decoder, client=None) + self._rtu = ModbusRtuFramer(decoder=self.decoder, client=None) + self._ascii = ModbusAsciiFramer(decoder=self.decoder, client=None) + self._binary = ModbusBinaryFramer(decoder=self.decoder, client=None) + self._manager = ModbusTransactionManager(self.client) + + # ----------------------------------------------------------------------- # + # Modbus transaction manager + # ----------------------------------------------------------------------- # + + def test_calculate_expected_response_length(self): + """Test calculate expected response length.""" + self._manager.client = mock.MagicMock() + self._manager.client.framer = mock.MagicMock() + self._manager._set_adu_size() # pylint: disable=protected-access + assert not self._manager._calculate_response_length( # pylint: disable=protected-access + 0 + ) + self._manager.base_adu_size = 10 + assert ( + self._manager._calculate_response_length(5) # pylint: disable=protected-access + == 15 + ) + + def test_calculate_exception_length(self): + """Test calculate exception length.""" + for framer, exception_length in ( + ("ascii", 11), + ("binary", 7), + ("rtu", 5), + ("tcp", 9), + ("tls", 2), + ("dummy", None), + ): + self._manager.client = mock.MagicMock() + if framer == "ascii": + self._manager.client.framer = self._ascii + elif framer == "binary": + self._manager.client.framer = self._binary + elif framer == "rtu": + self._manager.client.framer = self._rtu + elif framer == "tcp": + self._manager.client.framer = self._tcp + elif framer == "tls": + self._manager.client.framer = self._tls + else: + self._manager.client.framer = mock.MagicMock() + + self._manager._set_adu_size() # pylint: disable=protected-access + assert ( + self._manager._calculate_exception_length() # pylint: disable=protected-access + == exception_length + ) + + @mock.patch("pymodbus.transaction.time") + def test_execute(self, mock_time): + """Test execute.""" + mock_time.time.side_effect = count() + + client = mock.MagicMock() + client.framer = self._ascii + client.framer._buffer = b"deadbeef" # pylint: disable=protected-access + client.framer.processIncomingPacket = mock.MagicMock() + client.framer.processIncomingPacket.return_value = None + client.framer.buildPacket = mock.MagicMock() + client.framer.buildPacket.return_value = b"deadbeef" + client.framer.sendPacket = mock.MagicMock() + client.framer.sendPacket.return_value = len(b"deadbeef") + client.framer.decode_data = mock.MagicMock() + client.framer.decode_data.return_value = { + "slave": 1, + "fcode": 222, + "length": 27, + } + request = mock.MagicMock() + request.get_response_pdu_size.return_value = 10 + request.slave_id = 1 + request.function_code = 222 + trans = ModbusTransactionManager(client) + trans._recv = mock.MagicMock( # pylint: disable=protected-access + return_value=b"abcdef" + ) + assert trans.retries == 3 + assert not trans.retry_on_empty + + trans.getTransaction = mock.MagicMock() + trans.getTransaction.return_value = "response" + response = trans.execute(request) + assert response == "response" + # No response + trans._recv = mock.MagicMock( # pylint: disable=protected-access + return_value=b"abcdef" + ) + trans.transactions = {} + trans.getTransaction = mock.MagicMock() + trans.getTransaction.return_value = None + response = trans.execute(request) + assert isinstance(response, ModbusIOException) + + # No response with retries + trans.retry_on_empty = True + trans._recv = mock.MagicMock( # pylint: disable=protected-access + side_effect=iter([b"", b"abcdef"]) + ) + response = trans.execute(request) + assert isinstance(response, ModbusIOException) + + # wrong handle_local_echo + trans._recv = mock.MagicMock( # pylint: disable=protected-access + side_effect=iter([b"abcdef", b"deadbe", b"123456"]) + ) + client.comm_params.handle_local_echo = True + trans.retry_on_empty = False + trans.retry_on_invalid = False + assert trans.execute(request).message == "[Input/Output] Wrong local echo" + client.comm_params.handle_local_echo = False + + # retry on invalid response + trans.retry_on_invalid = True + trans._recv = mock.MagicMock( # pylint: disable=protected-access + side_effect=iter([b"", b"abcdef", b"deadbe", b"123456"]) + ) + response = trans.execute(request) + assert isinstance(response, ModbusIOException) + + # Unable to decode response + trans._recv = mock.MagicMock( # pylint: disable=protected-access + side_effect=ModbusIOException() + ) + client.framer.processIncomingPacket.side_effect = mock.MagicMock( + side_effect=ModbusIOException() + ) + assert isinstance(trans.execute(request), ModbusIOException) + + # Broadcast + client.params.broadcast_enable = True + request.slave_id = 0 + response = trans.execute(request) + assert response == b"Broadcast write sent - no response expected" + + # Broadcast w/ Local echo + client.comm_params.handle_local_echo = True + client.params.broadcast_enable = True + recv = mock.MagicMock(return_value=b"deadbeef") + trans._recv = recv # pylint: disable=protected-access + request.slave_id = 0 + response = trans.execute(request) + assert response == b"Broadcast write sent - no response expected" + recv.assert_called_once_with(8, False) + client.comm_params.handle_local_echo = False + + def test_transaction_manager_tid(self): + """Test the transaction manager TID.""" + for tid in range(1, self._manager.getNextTID() + 10): + assert tid + 1 == self._manager.getNextTID() + self._manager.reset() + assert self._manager.getNextTID() == 1 + + def test_get_transaction_manager_transaction(self): + """Test the getting a transaction from the transaction manager.""" + + class Request: # pylint: disable=too-few-public-methods + """Request.""" + + self._manager.reset() + handle = Request() + handle.transaction_id = ( # pylint: disable=attribute-defined-outside-init + self._manager.getNextTID() + ) + handle.message = b"testing" # pylint: disable=attribute-defined-outside-init + self._manager.addTransaction(handle) + result = self._manager.getTransaction(handle.transaction_id) + assert handle.message == result.message + + def test_delete_transaction_manager_transaction(self): + """Test deleting a transaction from the dict transaction manager.""" + + class Request: # pylint: disable=too-few-public-methods + """Request.""" + + self._manager.reset() + handle = Request() + handle.transaction_id = ( # pylint: disable=attribute-defined-outside-init + self._manager.getNextTID() + ) + handle.message = b"testing" # pylint: disable=attribute-defined-outside-init + + self._manager.addTransaction(handle) + self._manager.delTransaction(handle.transaction_id) + assert not self._manager.getTransaction(handle.transaction_id) + + # ----------------------------------------------------------------------- # + # TCP tests + # ----------------------------------------------------------------------- # + @pytest.mark.skip() + def test_tcp_framer_transaction_ready(self): + """Test a tcp frame transaction.""" + msg = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" + assert not self._tcp.isFrameReady() + assert not self._tcp.checkFrame() + self._tcp._buffer = msg # pylint: disable=protected-access + assert self._tcp.isFrameReady() + assert self._tcp.checkFrame() + self._tcp.advanceFrame() + assert not self._tcp.isFrameReady() + assert not self._tcp.checkFrame() + # assert self._ascii.getFrame() == b"" + + @pytest.mark.skip() + def test_tcp_framer_transaction_full(self): + """Test a full tcp frame transaction.""" + msg = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" + self._tcp._buffer = msg # pylint: disable=protected-access + assert self._tcp.checkFrame() + result = self._tcp.getFrame() + assert result == msg[7:] + self._tcp.advanceFrame() + + @pytest.mark.skip() + def test_tcp_framer_transaction_half(self): + """Test a half completed tcp frame transaction.""" + msg1 = b"\x00\x01\x12\x34\x00" + msg2 = b"\x04\xff\x02\x12\x34" + self._tcp._buffer = msg1 # pylint: disable=protected-access + assert not self._tcp.checkFrame() + result = self._tcp.getFrame() + assert result == b"" + self._tcp._buffer += msg2 + assert self._tcp.checkFrame() + result = self._tcp.getFrame() + assert result == msg2[2:] + self._tcp.advanceFrame() + + @pytest.mark.skip() + def test_tcp_framer_transaction_half2(self): + """Test a half completed tcp frame transaction.""" + msg1 = b"\x00\x01\x12\x34\x00\x04\xff" + msg2 = b"\x02\x12\x34" + self._tcp._buffer = msg1 # pylint: disable=protected-access + assert not self._tcp.checkFrame() + result = self._tcp.getFrame() + assert result == b"" + self._tcp._buffer += msg2 + assert self._tcp.checkFrame() + result = self._tcp.getFrame() + assert msg2 == result + self._tcp.advanceFrame() + + @pytest.mark.skip() + def test_tcp_framer_transaction_half3(self): + """Test a half completed tcp frame transaction.""" + msg1 = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12" + msg2 = b"\x34" + self._tcp._buffer = msg1 # pylint: disable=protected-access + assert not self._tcp.checkFrame() + result = self._tcp.getFrame() + assert result == msg1[7:] + self._tcp._buffer += msg2 + assert self._tcp.checkFrame() + result = self._tcp.getFrame() + assert result == msg1[7:] + msg2 + self._tcp.advanceFrame() + + @pytest.mark.skip() + def test_tcp_framer_transaction_short(self): + """Test that we can get back on track after an invalid message.""" + msg1 = b"\x99\x99\x99\x99\x00\x01\x00\x01" + msg2 = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" + self._tcp._buffer = msg1 # pylint: disable=protected-access + assert not self._tcp.checkFrame() + result = self._tcp.getFrame() + assert result == b"" + self._tcp.advanceFrame() + self._tcp._buffer += msg2 + assert len(self._tcp._buffer) == 10 # pylint: disable=protected-access + assert self._tcp.checkFrame() + result = self._tcp.getFrame() + assert result == msg2[7:] + self._tcp.advanceFrame() + + @pytest.mark.skip() + def test_tcp_framer_populate(self): + """Test a tcp frame packet build.""" + expected = ModbusRequest() + expected.transaction_id = 0x0001 + expected.protocol_id = 0x1234 + expected.slave_id = 0xFF + msg = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" + self._tcp._buffer = msg # pylint: disable=protected-access + assert self._tcp.checkFrame() + actual = ModbusRequest() + self._tcp.populateResult(actual) + for name in ("transaction_id", "protocol_id", "slave_id"): + assert getattr(expected, name) == getattr(actual, name) + self._tcp.advanceFrame() + + def test_tcp_framer_packet(self): + """Test a tcp frame packet build.""" + old_encode = ModbusRequest.encode + ModbusRequest.encode = lambda self: b"" + message = ModbusRequest() + message.transaction_id = 0x0001 + message.protocol_id = 0x1234 + message.slave_id = 0xFF + message.function_code = 0x01 + expected = b"\x00\x01\x12\x34\x00\x02\xff\x01" + actual = self._tcp.buildPacket(message) + assert expected == actual + ModbusRequest.encode = old_encode + + # ----------------------------------------------------------------------- # + # TLS tests + # ----------------------------------------------------------------------- # + @pytest.mark.skip() + def test_framer_tls_framer_transaction_ready(self): + """Test a tls frame transaction.""" + msg = b"\x01\x12\x34\x00\x08" + assert not self._tls.isFrameReady() + assert not self._tls.checkFrame() + self._tls._buffer = msg # pylint: disable=protected-access + assert self._tls.isFrameReady() + assert self._tls.checkFrame() + self._tls.advanceFrame() + assert not self._tls.isFrameReady() + assert not self._tls.checkFrame() + assert self._tls.getFrame() == b"" + + @pytest.mark.skip() + def test_framer_tls_framer_transaction_full(self): + """Test a full tls frame transaction.""" + msg = b"\x01\x12\x34\x00\x08" + self._tls._buffer = msg # pylint: disable=protected-access + assert self._tls.checkFrame() + result = self._tls.getFrame() + assert result == msg[0:] + self._tls.advanceFrame() + + @pytest.mark.skip() + def test_framer_tls_framer_transaction_half(self): + """Test a half completed tls frame transaction.""" + msg1 = b"" + msg2 = b"\x01\x12\x34\x00\x08" + self._tls._buffer = msg1 # pylint: disable=protected-access + assert not self._tls.checkFrame() + result = self._tls.getFrame() + assert result == b"" + self._tls._buffer += msg2 + assert self._tls.checkFrame() + result = self._tls.getFrame() + assert result == msg2[0:] + self._tls.advanceFrame() + + @pytest.mark.skip() + def test_framer_tls_framer_transaction_short(self): + """Test that we can get back on track after an invalid message.""" + msg1 = b"" + msg2 = b"\x01\x12\x34\x00\x08" + self._tls._buffer = msg1 # pylint: disable=protected-access + assert not self._tls.checkFrame() + result = self._tls.getFrame() + assert result == b"" + self._tls.advanceFrame() + self._tls._buffer = msg2 # pylint: disable=protected-access + assert len(self._tls._buffer) == 5 # pylint: disable=protected-access + assert self._tls.checkFrame() + result = self._tls.getFrame() + assert result == msg2[0:] + self._tls.advanceFrame() + + @pytest.mark.skip() + def test_framer_tls_framer_decode(self): + """Testmessage decoding.""" + msg1 = b"" + msg2 = b"\x01\x12\x34\x00\x08" + result = self._tls.decode_data(msg1) + assert not result + result = self._tls.decode_data(msg2) + assert result == {"fcode": 1} + self._tls.advanceFrame() + + @pytest.mark.skip() + def test_framer_tls_incoming_packet(self): + """Framer tls incoming packet.""" + msg = b"\x01\x12\x34\x00\x08" + + slave = 0x01 + msg_result = None + + def mock_callback(result): + """Mock callback.""" + nonlocal msg_result + + msg_result = result.encode() + + self._tls.isFrameReady = mock.MagicMock(return_value=False) + self._tls.processIncomingPacket(msg, mock_callback, slave) + assert msg == self._tls._buffer # pylint: disable=protected-access + self._tls.advanceFrame() + + self._tls.isFrameReady = mock.MagicMock(return_value=True) + x = mock.MagicMock(return_value=False) + self._tls._validate_slave_id = x # pylint: disable=protected-access + self._tls.processIncomingPacket(msg, mock_callback, slave) + assert not self._tls._buffer # pylint: disable=protected-access + self._tls.advanceFrame() + x = mock.MagicMock(return_value=True) + self._tls._validate_slave_id = x # pylint: disable=protected-access + self._tls.processIncomingPacket(msg, mock_callback, slave) + assert msg[1:] == msg_result + self._tls.advanceFrame() + + @pytest.mark.skip() + def test_framer_tls_process(self): + """Framer tls process.""" + + class MockResult: # pylint: disable=too-few-public-methods + """Mock result.""" + + def __init__(self, code): + """Init.""" + self.function_code = code + + def mock_callback(_arg): + """Mock callback.""" + + self._tls.decoder.decode = mock.MagicMock(return_value=None) + with pytest.raises(ModbusIOException): + self._tls._process(mock_callback) # pylint: disable=protected-access + + result = MockResult(0x01) + self._tls.decoder.decode = mock.MagicMock(return_value=result) + with pytest.raises(InvalidMessageReceivedException): + self._tls._process( # pylint: disable=protected-access + mock_callback, error=True + ) + self._tls._process(mock_callback) # pylint: disable=protected-access + assert not self._tls._buffer # pylint: disable=protected-access + + @pytest.mark.skip() + def test_framer_tls_framer_populate(self): + """Test a tls frame packet build.""" + ModbusRequest() + msg = b"\x01\x12\x34\x00\x08" + self._tls._buffer = msg # pylint: disable=protected-access + assert self._tls.checkFrame() + actual = ModbusRequest() + self._tls.populateResult(actual) + self._tls.advanceFrame() + + def test_framer_tls_framer_packet(self): + """Test a tls frame packet build.""" + old_encode = ModbusRequest.encode + ModbusRequest.encode = lambda self: b"" + message = ModbusRequest() + message.function_code = 0x01 + expected = b"\x01" + actual = self._tls.buildPacket(message) + assert expected == actual + ModbusRequest.encode = old_encode + + # ----------------------------------------------------------------------- # + # RTU tests + # ----------------------------------------------------------------------- # + @pytest.mark.skip() + def test_rtu_framer_transaction_ready(self): + """Test if the checks for a complete frame work.""" + assert not self._rtu.isFrameReady() + + msg_parts = [b"\x00\x01\x00", b"\x00\x00\x01\xfc\x1b"] + self._rtu._buffer = msg_parts[0] # pylint: disable=protected-access + assert not self._rtu.isFrameReady() + assert not self._rtu.checkFrame() + + self._rtu._buffer += msg_parts[1] + assert self._rtu.isFrameReady() + assert self._rtu.checkFrame() + + @pytest.mark.skip() + def test_rtu_framer_transaction_full(self): + """Test a full rtu frame transaction.""" + msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" + stripped_msg = msg[1:-2] + self._rtu._buffer = msg # pylint: disable=protected-access + assert self._rtu.checkFrame() + result = self._rtu.getFrame() + assert stripped_msg == result + self._rtu.advanceFrame() + + @pytest.mark.skip() + def test_rtu_framer_transaction_half(self): + """Test a half completed rtu frame transaction.""" + msg_parts = [b"\x00\x01\x00", b"\x00\x00\x01\xfc\x1b"] + stripped_msg = b"".join(msg_parts)[1:-2] + self._rtu._buffer = msg_parts[0] # pylint: disable=protected-access + assert not self._rtu.checkFrame() + self._rtu._buffer += msg_parts[1] + assert self._rtu.isFrameReady() + assert self._rtu.checkFrame() + result = self._rtu.getFrame() + assert stripped_msg == result + self._rtu.advanceFrame() + + @pytest.mark.skip() + def test_rtu_framer_populate(self): + """Test a rtu frame packet build.""" + request = ModbusRequest() + msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" + self._rtu._buffer = msg # pylint: disable=protected-access + self._rtu.populateHeader() + self._rtu.populateResult(request) + + header_dict = self._rtu._header # pylint: disable=protected-access + assert len(msg) == header_dict["len"] + assert int(msg[0]) == header_dict["uid"] + assert msg[-2:] == header_dict["crc"] + assert not request.slave_id + + def test_rtu_framer_packet(self): + """Test a rtu frame packet build.""" + old_encode = ModbusRequest.encode + ModbusRequest.encode = lambda self: b"" + message = ModbusRequest() + message.slave_id = 0xFF + message.function_code = 0x01 + expected = b"\xff\x01\x81\x80" # only header + CRC - no data + actual = self._rtu.buildPacket(message) + assert expected == actual + ModbusRequest.encode = old_encode + + @pytest.mark.skip() + def test_rtu_decode_exception(self): + """Test that the RTU framer can decode errors.""" + message = b"\x00\x90\x02\x9c\x01" + self._rtu._buffer = message # pylint: disable=protected-access + result = self._rtu.checkFrame() + assert result + + @pytest.mark.skip() + def test_process(self): + """Test process.""" + + class MockResult: # pylint: disable=too-few-public-methods + """Mock result.""" + + def __init__(self, code): + self.function_code = code + + def mock_callback(_arg): + """Mock callback.""" + + mock_result = MockResult(code=0) + self._rtu.getFrame = mock.MagicMock() + self._rtu.decoder = mock.MagicMock() + self._rtu.decoder.decode = mock.MagicMock(return_value=mock_result) + self._rtu.populateResult = mock.MagicMock() + self._rtu.advanceFrame = mock.MagicMock() + + self._rtu._process(mock_callback) # pylint: disable=protected-access + self._rtu.populateResult.assert_called_with(mock_result) + self._rtu.advanceFrame.assert_called_with() + assert self._rtu.advanceFrame.called + + # Check errors + self._rtu.decoder.decode = mock.MagicMock(return_value=None) + with pytest.raises(ModbusIOException): + self._rtu._process(mock_callback) # pylint: disable=protected-access + + @pytest.mark.skip() + def test_rtu_process_incoming_packets(self): + """Test rtu process incoming packets.""" + mock_data = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" + slave = 0x00 + + def mock_callback(): + """Mock callback.""" + + self._rtu._buffer = mock.MagicMock() # pylint: disable=protected-access + self._rtu._process = mock.MagicMock() # pylint: disable=protected-access + self._rtu.isFrameReady = mock.MagicMock(return_value=False) + self._rtu._buffer = mock_data # pylint: disable=protected-access + + self._rtu.processIncomingPacket(mock_data, mock_callback, slave) + + # ----------------------------------------------------------------------- # + # ASCII tests + # ----------------------------------------------------------------------- # + @pytest.mark.skip() + def test_ascii_framer_transaction_ready(self): + """Test a ascii frame transaction.""" + msg = b":F7031389000A60\r\n" + assert not self._ascii.isFrameReady() + assert not self._ascii.checkFrame() + self._ascii._buffer = msg # pylint: disable=protected-access + assert self._ascii.isFrameReady() + assert self._ascii.checkFrame() + self._ascii.advanceFrame() + assert not self._ascii.isFrameReady() + assert not self._ascii.checkFrame() + assert not self._ascii.getFrame() + + @pytest.mark.skip() + def test_ascii_framer_transaction_full(self): + """Test a full ascii frame transaction.""" + msg = b"sss:F7031389000A60\r\n" + pack = a2b_hex(msg[6:-4]) + self._ascii._buffer = msg # pylint: disable=protected-access + assert self._ascii.checkFrame() + result = self._ascii.getFrame() + assert pack == result + self._ascii.advanceFrame() + + @pytest.mark.skip() + def test_ascii_framer_transaction_half(self): + """Test a half completed ascii frame transaction.""" + msg1 = b"sss:F7031389" + msg2 = b"000A60\r\n" + pack = a2b_hex(msg1[6:] + msg2[:-4]) + self._ascii._buffer = msg1 # pylint: disable=protected-access + assert not self._ascii.checkFrame() + result = self._ascii.getFrame() + assert not result + self._ascii._buffer += msg2 + assert self._ascii.checkFrame() + result = self._ascii.getFrame() + assert pack == result + self._ascii.advanceFrame() + + def test_ascii_framer_populate(self): + """Test a ascii frame packet build.""" + request = ModbusRequest() + self._ascii.populateResult(request) + assert not request.slave_id + + def test_ascii_framer_packet(self): + """Test a ascii frame packet build.""" + old_encode = ModbusRequest.encode + ModbusRequest.encode = lambda self: b"" + message = ModbusRequest() + message.slave_id = 0xFF + message.function_code = 0x01 + expected = b":FF0100\r\n" + actual = self._ascii.buildPacket(message) + assert expected == actual + ModbusRequest.encode = old_encode + + def test_ascii_process_incoming_packets(self): + """Test ascii process incoming packet.""" + mock_data = b":F7031389000A60\r\n" + slave = 0x00 + + def mock_callback(_mock_data, *_args, **_kwargs): + """Mock callback.""" + + self._ascii.processIncomingPacket(mock_data, mock_callback, slave) + + # Test failure: + self._ascii.checkFrame = mock.MagicMock(return_value=False) + self._ascii.processIncomingPacket(mock_data, mock_callback, slave) + + # ----------------------------------------------------------------------- # + # Binary tests + # ----------------------------------------------------------------------- # + @pytest.mark.skip() + def test_binary_framer_transaction_ready(self): + """Test a binary frame transaction.""" + msg = TEST_MESSAGE + assert not self._binary.isFrameReady() + assert not self._binary.checkFrame() + self._binary._buffer = msg # pylint: disable=protected-access + assert self._binary.isFrameReady() + assert self._binary.checkFrame() + self._binary.advanceFrame() + assert not self._binary.isFrameReady() + assert not self._binary.checkFrame() + assert not self._binary.getFrame() + + @pytest.mark.skip() + def test_binary_framer_transaction_full(self): + """Test a full binary frame transaction.""" + msg = TEST_MESSAGE + pack = msg[2:-3] + self._binary._buffer = msg # pylint: disable=protected-access + assert self._binary.checkFrame() + result = self._binary.getFrame() + assert pack == result + self._binary.advanceFrame() + + @pytest.mark.skip() + def test_binary_framer_transaction_half(self): + """Test a half completed binary frame transaction.""" + msg1 = b"\x7b\x01\x03\x00" + msg2 = b"\x00\x00\x05\x85\xC9\x7d" + pack = msg1[2:] + msg2[:-3] + self._binary._buffer = msg1 # pylint: disable=protected-access + assert not self._binary.checkFrame() + result = self._binary.getFrame() + assert not result + self._binary._buffer += msg2 + + assert self._binary.checkFrame() + result = self._binary.getFrame() + assert pack == result + self._binary.advanceFrame() + + def test_binary_framer_populate(self): + """Test a binary frame packet build.""" + request = ModbusRequest() + self._binary.populateResult(request) + assert not request.slave_id + + def test_binary_framer_packet(self): + """Test a binary frame packet build.""" + old_encode = ModbusRequest.encode + ModbusRequest.encode = lambda self: b"" + message = ModbusRequest() + message.slave_id = 0xFF + message.function_code = 0x01 + expected = b"\x7b\xff\x01\x81\x80\x7d" + actual = self._binary.buildPacket(message) + assert expected == actual + ModbusRequest.encode = old_encode + + def test_binary_process_incoming_packet(self): + """Test binary process incoming packet.""" + mock_data = TEST_MESSAGE + slave = 0x00 + + def mock_callback(_mock_data): + pass + + self._binary.processIncomingPacket(mock_data, mock_callback, slave) + + # Test failure: + self._binary.checkFrame = mock.MagicMock(return_value=False) + self._binary.processIncomingPacket(mock_data, mock_callback, slave) diff --git a/test/sub_server/test_server_multidrop.py b/test/sub_server/test_server_multidrop.py index 77e78eefc..74711ab5c 100644 --- a/test/sub_server/test_server_multidrop.py +++ b/test/sub_server/test_server_multidrop.py @@ -143,27 +143,32 @@ def test_frame_with_trailing_data(self, framer, callback): def test_getFrameStart(self, framer): """Test getFrameStart.""" - framer_ok = b"\x02\x03\x00\x01\x00}\xd4\x18" - framer._buffer = framer_ok # pylint: disable=protected-access - assert framer.getFrameStart(self.slaves, False, False) - assert framer_ok == framer._buffer # pylint: disable=protected-access - + result = None + count = 0 + def test_callback(data): + """Check callback.""" + nonlocal result, count + count += 1 + result = data.function_code.to_bytes(1,'big')+data.encode() + + framer_ok = b"\x02\x03\x00\x01\x00\x7d\xd4\x18" + framer.processIncomingPacket(framer_ok, test_callback, self.slaves) + assert framer_ok[1:-2] == result + + count = 0 framer_2ok = framer_ok + framer_ok - framer._buffer = framer_2ok # pylint: disable=protected-access - assert framer.getFrameStart(self.slaves, False, False) - assert framer_2ok == framer._buffer # pylint: disable=protected-access - assert framer.getFrameStart(self.slaves, False, True) - assert framer_ok == framer._buffer # pylint: disable=protected-access + framer.processIncomingPacket(framer_2ok, test_callback, self.slaves) + assert count == 2 + assert not framer._buffer # pylint: disable=protected-access framer._buffer = framer_ok[:2] # pylint: disable=protected-access - assert not framer.getFrameStart(self.slaves, False, False) + framer.processIncomingPacket(b'', test_callback, self.slaves) assert framer_ok[:2] == framer._buffer # pylint: disable=protected-access framer._buffer = framer_ok[:3] # pylint: disable=protected-access - assert not framer.getFrameStart(self.slaves, False, False) + framer.processIncomingPacket(b'', test_callback, self.slaves) assert framer_ok[:3] == framer._buffer # pylint: disable=protected-access framer_ok = b"\xF0\x03\x00\x01\x00}\xd4\x18" - framer._buffer = framer_ok # pylint: disable=protected-access - assert not framer.getFrameStart(self.slaves, False, False) + framer.processIncomingPacket(framer_ok, test_callback, self.slaves) assert framer._buffer == framer_ok[-3:] # pylint: disable=protected-access diff --git a/test/test_framers.py b/test/test_framers.py index a24e8bf25..0eafac228 100644 --- a/test/test_framers.py +++ b/test/test_framers.py @@ -24,18 +24,25 @@ TEST_MESSAGE = b"\x00\x01\x00\x01\x00\n\xec\x1c" -@pytest.fixture(name="rtu_framer") -def fixture_rtu_framer(): - """RTU framer.""" - return ModbusRtuFramer(ClientDecoder()) +class TestFramers: + """Test framers.""" -@pytest.fixture(name="ascii_framer") -def fixture_ascii_framer(): - """Ascii framer.""" - return ModbusAsciiFramer(ClientDecoder()) + slaves = [2, 17] + @staticmethod + @pytest.fixture(name="rtu_framer") + def fixture_rtu_framer(): + """RTU framer.""" + return ModbusRtuFramer(ClientDecoder()) -@pytest.mark.parametrize( + @staticmethod + @pytest.fixture(name="ascii_framer") + def fixture_ascii_framer(): + """Ascii framer.""" + return ModbusAsciiFramer(ClientDecoder()) + + + @pytest.mark.parametrize( "framer", [ ModbusRtuFramer, @@ -43,415 +50,472 @@ def fixture_ascii_framer(): ModbusBinaryFramer, ], ) -def test_framer_initialization(framer): - """Test framer initialization.""" - decoder = ClientDecoder() - framer = framer(decoder) - assert framer.client is None - assert framer._buffer == b"" # pylint: disable=protected-access - assert framer.decoder == decoder - if isinstance(framer, ModbusAsciiFramer): - assert framer._header == { # pylint: disable=protected-access - "tid": 0, - "pid": 0, + def test_framer_initialization(self, framer): + """Test framer initialization.""" + decoder = ClientDecoder() + framer = framer(decoder) + assert framer.client is None + assert framer._buffer == b"" # pylint: disable=protected-access + assert framer.decoder == decoder + if isinstance(framer, ModbusAsciiFramer): + assert framer._header == { # pylint: disable=protected-access + "tid": 0, + "pid": 0, + "lrc": "0000", + "len": 0, + "uid": 0x00, + "crc": b"\x00\x00", + } + assert framer._hsize == 0x02 # pylint: disable=protected-access + assert framer._start == b":" # pylint: disable=protected-access + assert framer._end == b"\r\n" # pylint: disable=protected-access + elif isinstance(framer, ModbusRtuFramer): + assert framer._header == { # pylint: disable=protected-access + "tid": 0, + "pid": 0, + "lrc": "0000", + "uid": 0x00, + "len": 0, + "crc": b"\x00\x00", + } + assert framer._hsize == 0x01 # pylint: disable=protected-access + assert framer._end == b"\x0d\x0a" # pylint: disable=protected-access + assert framer._min_frame_size == 4 # pylint: disable=protected-access + else: + assert framer._header == { # pylint: disable=protected-access + "tid": 0, + "pid": 0, + "lrc": "0000", + "crc": b"\x00\x00", + "len": 0, + "uid": 0x00, + } + assert framer._hsize == 0x01 # pylint: disable=protected-access + assert framer._start == b"\x7b" # pylint: disable=protected-access + assert framer._end == b"\x7d" # pylint: disable=protected-access + assert framer._repeat == [ # pylint: disable=protected-access + b"}"[0], + b"{"[0], + ] + + + @pytest.mark.parametrize( + ("data", "expected"), + [ + (b"", 0), + (b"\x02\x01\x01\x00Q\xcc", 1), + (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", 1), # valid frame + (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC", 0), # invalid frame CRC + ], + ) + def test_check_frame(self, rtu_framer, data, expected): + """Test check frame.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + rtu_framer.processIncomingPacket(data, callback, self.slaves) + assert count == expected + + + @pytest.mark.parametrize( + ("data", "header", "res"), + [ + (b"", {"uid": 0x00, "len": 0, "crc": b"\x00\x00"}, 0), + (b"abcd", {"uid": 0x00, "len": 2, "crc": b"\x00\x00"}, 0), + ( + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x12\x03", # real case, frame size is 11 + {"uid": 0x00, "len": 11, "crc": b"\x00\x00"}, + 1, + ), + ], + ) + def test_rtu_advance_framer(self, rtu_framer, data, header, res): + """Test rtu advance framer.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + rtu_framer._header = header # pylint: disable=protected-access + rtu_framer.processIncomingPacket(data, callback, self.slaves) + assert count == res + + + @pytest.mark.parametrize("data", [b"", b"abcd"]) + def test_rtu_reset_framer(self, rtu_framer, data): + """Test rtu reset framer.""" + rtu_framer._buffer = data # pylint: disable=protected-access + rtu_framer.resetFrame() + assert rtu_framer._header == { # pylint: disable=protected-access "lrc": "0000", + "crc": b"\x00\x00", "len": 0, "uid": 0x00, - "crc": b"\x00\x00", - } - assert framer._hsize == 0x02 # pylint: disable=protected-access - assert framer._start == b":" # pylint: disable=protected-access - assert framer._end == b"\r\n" # pylint: disable=protected-access - elif isinstance(framer, ModbusRtuFramer): - assert framer._header == { # pylint: disable=protected-access - "tid": 0, "pid": 0, - "lrc": "0000", - "uid": 0x00, - "len": 0, - "crc": b"\x00\x00", - } - assert framer._hsize == 0x01 # pylint: disable=protected-access - assert framer._end == b"\x0d\x0a" # pylint: disable=protected-access - assert framer._min_frame_size == 4 # pylint: disable=protected-access - else: - assert framer._header == { # pylint: disable=protected-access "tid": 0, - "pid": 0, - "lrc": "0000", - "crc": b"\x00\x00", - "len": 0, - "uid": 0x00, } - assert framer._hsize == 0x01 # pylint: disable=protected-access - assert framer._start == b"\x7b" # pylint: disable=protected-access - assert framer._end == b"\x7d" # pylint: disable=protected-access - assert framer._repeat == [ # pylint: disable=protected-access - b"}"[0], - b"{"[0], - ] - - -@pytest.mark.parametrize( - "data", - [ - (b"", False), - (b"\x02\x01\x01\x00Q\xcc", True), - (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", True), # valid frame - (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC", False), # invalid frame CRC - ], -) -def test_check_frame(rtu_framer, data): - """Test check frame.""" - data, expected = data - rtu_framer._buffer = data # pylint: disable=protected-access - assert expected == rtu_framer.checkFrame() - - -@pytest.mark.parametrize( - "data", - [ - (b"", {"uid": 0x00, "len": 0, "crc": b"\x00\x00"}, b""), - (b"abcd", {"uid": 0x00, "len": 2, "crc": b"\x00\x00"}, b"cd"), - ( - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x12\x03", # real case, frame size is 11 - {"uid": 0x00, "len": 11, "crc": b"\x00\x00"}, - b"\x12\x03", - ), - ], -) -def test_rtu_advance_framer(rtu_framer, data): - """Test rtu advance framer.""" - before_buf, before_header, after_buf = data - - rtu_framer._buffer = before_buf # pylint: disable=protected-access - rtu_framer._header = before_header # pylint: disable=protected-access - rtu_framer.advanceFrame() - assert rtu_framer._header == { # pylint: disable=protected-access - "uid": 0x00, - "len": 0, - "crc": b"\x00\x00", - } - assert rtu_framer._buffer == after_buf # pylint: disable=protected-access - - -@pytest.mark.parametrize("data", [b"", b"abcd"]) -def test_rtu_reset_framer(rtu_framer, data): - """Test rtu reset framer.""" - rtu_framer._buffer = data # pylint: disable=protected-access - rtu_framer.resetFrame() - assert rtu_framer._header == { # pylint: disable=protected-access - "lrc": "0000", - "crc": b"\x00\x00", - "len": 0, - "uid": 0x00, - "pid": 0, - "tid": 0, - } - - -@pytest.mark.parametrize( - "data", - [ - (b"", False), - (b"\x11", False), - (b"\x11\x03", False), - (b"\x11\x03\x06", False), - (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49", False), - (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", True), - (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\xAB\xCD", True), - ], -) -def test_is_frame_ready(rtu_framer, data): - """Test is frame ready.""" - data, expected = data - rtu_framer._buffer = data # pylint: disable=protected-access - # rtu_framer.advanceFrame() - assert rtu_framer.isFrameReady() == expected - - -@pytest.mark.parametrize( - "data", - [ - b"", - b"\x11", - b"\x11\x03", - b"\x11\x03\x06", - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x43", - ], -) -def test_rtu_populate_header_fail(rtu_framer, data): - """Test rtu populate header fail.""" - with pytest.raises(IndexError): - rtu_framer.populateHeader(data) - -@pytest.mark.parametrize( - "data", - [ - ( - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", - { - "crc": b"\x49\xAD", - "uid": 17, - "len": 11, - "lrc": "0000", - "tid": 17, - "pid": 0, - }, - ), - ( - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03", - { - "crc": b"\x49\xAD", - "uid": 17, - "len": 11, - "lrc": "0000", - "tid": 17, - "pid": 0, - }, - ), - ], -) -def test_rtu_populate_header(rtu_framer, data): - """Test rtu populate header.""" - buffer, expected = data - rtu_framer.populateHeader(buffer) - assert rtu_framer._header == expected # pylint: disable=protected-access - - -def test_get_frame(rtu_framer): - """Test get frame.""" - rtu_framer._buffer = b"\x02\x01\x01\x00Q\xcc" # pylint: disable=protected-access - rtu_framer.populateHeader(b"\x02\x01\x01\x00Q\xcc") - assert rtu_framer.getFrame() == b"\x01\x01\x00" + @pytest.mark.parametrize( + ("data", "expected"), + [ + (b"", 0), + (b"\x11", 0), + (b"\x11\x03", 0), + (b"\x11\x03\x06", 0), + (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49", 0), + (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", 1), + (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\xAB\xCD", 1), + ], + ) + def test_is_frame_ready(self, rtu_framer, data, expected): + """Test is frame ready.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + rtu_framer.processIncomingPacket(data, callback, self.slaves) + assert count == expected + + + @pytest.mark.parametrize( + "data", + [ + b"", + b"\x11", + b"\x11\x03", + b"\x11\x03\x06", + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x43", + ], + ) + def test_rtu_populate_header_fail(self, rtu_framer, data): + """Test rtu populate header fail.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + rtu_framer.processIncomingPacket(data, callback, self.slaves) + assert not count + + + @pytest.mark.parametrize( + ("data", "header"), + [ + ( + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", + { + "crc": b"\x49\xAD", + "uid": 17, + "len": 11, + "tid": 17, + }, + ), + ( + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03", + { + "crc": b"\x49\xAD", + "uid": 17, + "len": 11, + "tid": 17, + }, + ), + ], + ) + def test_rtu_populate_header(self, rtu_framer, data, header): + """Test rtu populate header.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + rtu_framer.processIncomingPacket(data, callback, self.slaves) + assert rtu_framer._header == header # pylint: disable=protected-access + + + def test_get_frame(self, rtu_framer): + """Test get frame.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + data = b"\x02\x01\x01\x00Q\xcc" + rtu_framer.processIncomingPacket(data, callback, self.slaves) + assert count + assert result.function_code.to_bytes(1,'big') + result.encode() == b"\x01\x01\x00" + + + def test_populate_result(self, rtu_framer): + """Test populate result.""" + rtu_framer._header["uid"] = 255 # pylint: disable=protected-access + result = mock.Mock() + rtu_framer.populateResult(result) + assert result.slave_id == 255 + + + @pytest.mark.parametrize( + ("data", "slaves", "reset_called", "cb_called"), + [ + (b"\x11", [17], 0, 0), # not complete frame + (b"\x11\x03", [17], 0, 0), # not complete frame + (b"\x11\x03\x06", [17], 0, 0), # not complete frame + (b"\x11\x03\x06\xAE\x41\x56\x52\x43", [17], 0, 0), # not complete frame + ( + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40", + [17], + 0, + 0, + ), # not complete frame + ( + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49", + [17], + 0, + 0, + ), # not complete frame + (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC", [17], 1, 0), # bad crc + ( + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", + [17], + 0, + 1, + ), # good frame + ( + b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", + [16], + 0, + 0, + ), # incorrect slave id + (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03", [17], 0, 1), + # good frame + part of next frame + ], + ) + def test_rtu_incoming_packet(self, rtu_framer, data, slaves, reset_called, cb_called): + """Test rtu process incoming packet.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + with mock.patch.object( + rtu_framer, "resetFrame", wraps=rtu_framer.resetFrame + ) as mock_reset: + rtu_framer.processIncomingPacket(data, callback, slaves) + assert count == cb_called + assert mock_reset.call_count == reset_called + + + async def test_send_packet(self, rtu_framer): + """Test send packet.""" + message = TEST_MESSAGE + client = ModbusBaseClient( + Framer.ASCII, + host="localhost", + port=BASE_PORT + 1, + CommType=CommType.TCP, + ) + client.state = ModbusTransactionState.TRANSACTION_COMPLETE + client.silent_interval = 1 + client.last_frame_end = 1 + client.comm_params.timeout_connect = 0.25 + client.idle_time = mock.Mock(return_value=1) + client.send = mock.Mock(return_value=len(message)) + rtu_framer.client = client + assert rtu_framer.sendPacket(message) == len(message) + client.state = ModbusTransactionState.PROCESSING_REPLY + assert rtu_framer.sendPacket(message) == len(message) + + + def test_recv_packet(self, rtu_framer): + """Test receive packet.""" + message = TEST_MESSAGE + client = mock.Mock() + client.recv.return_value = message + rtu_framer.client = client + assert rtu_framer.recvPacket(len(message)) == message + + def test_process(self, rtu_framer): + """Test process.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + data = TEST_MESSAGE + rtu_framer.processIncomingPacket(data, callback, self.slaves) + assert not count + + @pytest.mark.parametrize(("slaves", "res"), [([16], 0), ([17], 1)]) + def test_validate__slave_id(self,rtu_framer, slaves, res): + """Test validate slave.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + data = b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x12\x03" + rtu_framer.processIncomingPacket(data, callback, slaves) + assert count == res + + @pytest.mark.parametrize("data", [b":010100010001FC\r\n", b""]) + def test_decode_ascii_data(self, ascii_framer, data): + """Test decode ascii.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + ascii_framer.processIncomingPacket(data, callback, [1]) + if result: + assert result.slave_id == 1 + assert result.function_code == 1 + else: + assert not result + + def test_recv_split_packet(self): + """Test receive packet.""" + response_ok = False -def test_populate_result(rtu_framer): - """Test populate result.""" - rtu_framer._header["uid"] = 255 # pylint: disable=protected-access - result = mock.Mock() - rtu_framer.populateResult(result) - assert result.slave_id == 255 + def _handle_response(_reply): + """Handle response.""" + nonlocal response_ok + response_ok = True + + message = bytearray(b"\x00\x01\x00\x00\x00\x0b\x01\x03\x08\x00\xb5\x12\x2f\x37\x21\x00\x03") + for i in range(0, len(message)): + part1 = message[:i] + part2 = message[i:] + response_ok = False + framer = ModbusSocketFramer(ClientDecoder()) + if i: + framer.processIncomingPacket(part1, _handle_response, slave=0) + assert not response_ok, "Response should not be accepted" + framer.processIncomingPacket(part2, _handle_response, slave=0) + assert response_ok, "Response is valid, but not accepted" + + + def test_recv_socket_exception_packet(self): + """Test receive packet.""" + response_ok = False + def _handle_response(_reply): + """Handle response.""" + nonlocal response_ok + response_ok = True -@pytest.mark.parametrize( - "data", - [ - (b"\x11", 17, False, False), # not complete frame - (b"\x11\x03", 17, False, False), # not complete frame - (b"\x11\x03\x06", 17, False, False), # not complete frame - (b"\x11\x03\x06\xAE\x41\x56\x52\x43", 17, False, False), # not complete frame - ( - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40", - 17, - False, - False, - ), # not complete frame - ( - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49", - 17, - False, - False, - ), # not complete frame - (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC", 17, True, False), # bad crc - ( - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", - 17, - False, - True, - ), # good frame - ( - b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", - 16, - False, - False, - ), # incorrect slave id - (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03", 17, False, True), - # good frame + part of next frame - ], -) -def test_rtu_incoming_packet(rtu_framer, data): - """Test rtu process incoming packet.""" - buffer, slaves, reset_called, process_called = data - - with mock.patch.object( - rtu_framer, - "_process", - wraps=rtu_framer._process, # pylint: disable=protected-access - ) as mock_process, mock.patch.object( - rtu_framer, "resetFrame", wraps=rtu_framer.resetFrame - ) as mock_reset: - rtu_framer.processIncomingPacket(buffer, mock.Mock(), slaves) - assert mock_process.call_count == (1 if process_called else 0) - assert mock_reset.call_count == (1 if reset_called else 0) - - -async def test_send_packet(rtu_framer): - """Test send packet.""" - message = TEST_MESSAGE - client = ModbusBaseClient( - Framer.ASCII, - host="localhost", - port=BASE_PORT + 1, - CommType=CommType.TCP, - ) - client.state = ModbusTransactionState.TRANSACTION_COMPLETE - client.silent_interval = 1 - client.last_frame_end = 1 - client.comm_params.timeout_connect = 0.25 - client.idle_time = mock.Mock(return_value=1) - client.send = mock.Mock(return_value=len(message)) - rtu_framer.client = client - assert rtu_framer.sendPacket(message) == len(message) - client.state = ModbusTransactionState.PROCESSING_REPLY - assert rtu_framer.sendPacket(message) == len(message) - - -def test_recv_packet(rtu_framer): - """Test receive packet.""" - message = TEST_MESSAGE - client = mock.Mock() - client.recv.return_value = message - rtu_framer.client = client - assert rtu_framer.recvPacket(len(message)) == message - -def test_process(rtu_framer): - """Test process.""" - rtu_framer._buffer = TEST_MESSAGE # pylint: disable=protected-access - with pytest.raises(ModbusIOException): - rtu_framer._process(None) # pylint: disable=protected-access - - -def test_validate__slave_id(rtu_framer): - """Test validate slave.""" - rtu_framer.populateHeader(TEST_MESSAGE) - assert rtu_framer._validate_slave_id([0], False) # pylint: disable=protected-access - assert rtu_framer._validate_slave_id([1], True) # pylint: disable=protected-access - - -@pytest.mark.parametrize("data", [b":010100010001FC\r\n", b""]) -def test_decode_ascii_data(ascii_framer, data): - """Test decode ascii.""" - data = ascii_framer.decode_data(data) - assert isinstance(data, dict) - if data: - assert data.get("slave") == 1 - assert data.get("fcode") == 1 - else: - assert not data - -def test_recv_split_packet(): - """Test receive packet.""" - response_ok = False - - def _handle_response(_reply): - """Handle response.""" - nonlocal response_ok - response_ok = True - - message = bytearray(b"\x00\x01\x00\x00\x00\x0b\x01\x03\x08\x00\xb5\x12\x2f\x37\x21\x00\x03") - for i in range(0, len(message)): - part1 = message[:i] - part2 = message[i:] + message = bytearray(b"\x00\x02\x00\x00\x00\x03\x01\x84\x02") response_ok = False framer = ModbusSocketFramer(ClientDecoder()) - if i: - framer.processIncomingPacket(part1, _handle_response, slave=0) - assert not response_ok, "Response should not be accepted" - framer.processIncomingPacket(part2, _handle_response, slave=0) + framer.processIncomingPacket(message, _handle_response, slave=0) assert response_ok, "Response is valid, but not accepted" + message = bytearray(b"\x00\x01\x00\x00\x00\x0b\x01\x03\x08\x00\xb5\x12\x2f\x37\x21\x00\x03") + response_ok = False + framer = ModbusSocketFramer(ClientDecoder()) + framer.processIncomingPacket(message, _handle_response, slave=0) + assert response_ok, "Response is valid, but not accepted" -def test_recv_socket_exception_packet(): - """Test receive packet.""" - response_ok = False - - def _handle_response(_reply): - """Handle response.""" - nonlocal response_ok - response_ok = True - - message = bytearray(b"\x00\x02\x00\x00\x00\x03\x01\x84\x02") - response_ok = False - framer = ModbusSocketFramer(ClientDecoder()) - framer.processIncomingPacket(message, _handle_response, slave=0) - assert response_ok, "Response is valid, but not accepted" - - message = bytearray(b"\x00\x01\x00\x00\x00\x0b\x01\x03\x08\x00\xb5\x12\x2f\x37\x21\x00\x03") - response_ok = False - framer = ModbusSocketFramer(ClientDecoder()) - framer.processIncomingPacket(message, _handle_response, slave=0) - assert response_ok, "Response is valid, but not accepted" - -# ---- 100% coverage -@pytest.mark.parametrize( - ("framer", "message"), - [ - (ModbusAsciiFramer, b':00010001000AF4\r\n',), - (ModbusBinaryFramer, b'{\x00\x01\x00\x01\x00\n\xec\x1c}',), - (ModbusRtuFramer, b"\x00\x01\x00\x01\x00\n\xec\x1c",), - (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x00\x01\x00\x01\x00\n',), - ] -) -def test_build_packet(framer, message): - """Test build packet.""" - test_framer = framer(ClientDecoder()) - request = ReadCoilsRequest(1, 10) - assert test_framer.buildPacket(request) == message - - -@pytest.mark.parametrize( - ("framer", "message"), - [ - (ModbusAsciiFramer, b':01010001000AF3\r\n',), - (ModbusBinaryFramer, b'A{\x01\x01\x00\x01\x00\n\xed\xcd}',), - (ModbusRtuFramer, b"\x01\x01\x03\x01\x00\n\xed\x89",), - (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x01\x01\x00\x01\x00\n',), - ] -) -@pytest.mark.parametrize(("slave"), [0x01, 0x02]) -def test_processincomingpacket_ok(framer, message, slave): - """Test processIncomingPacket.""" - test_framer = framer(ClientDecoder()) - test_framer.processIncomingPacket(message, mock.Mock(), slave) - - -@pytest.mark.parametrize( - ("framer", "message"), - [ - (ModbusAsciiFramer, b':01270001000ACD\r\n',), - (ModbusBinaryFramer, b'{\x01\x1a\x00\x01\x00\n\x89\xcf}',), - (ModbusRtuFramer, b"\x01\x03\x03\x01\x00\n\x94\x49",), - (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x01\x27\x00\x01\x00\n',), - ] -) -def test_processincomingpacket_not_ok(framer, message): - """Test processIncomingPacket.""" - test_framer = framer(ClientDecoder()) - with pytest.raises(ModbusIOException): - test_framer.processIncomingPacket(message, mock.Mock(), 0x01) - -@pytest.mark.parametrize( - ("framer", "message"), - [ - (ModbusAsciiFramer, b':61620001000AF4\r\n',), - (ModbusBinaryFramer, b'{\x61\x62\x00\x01\x00\n\xec\x1c}',), - (ModbusRtuFramer, b"\x61\x62\x00\x01\x00\n\xec\x1c",), - (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x61\x62\x00\x01\x00\n',), - ] -) -@pytest.mark.parametrize("expected", [{"fcode": 98, "slave": 97}]) -def test_decode_data(framer, message, expected): - """Test decode data.""" - test_framer = framer(ClientDecoder()) - decoded = test_framer.decode_data(b'') - assert decoded == {} - decoded = test_framer.decode_data(message) - assert decoded["fcode"] == expected["fcode"] - assert decoded["slave"] == expected["slave"] - -def test_binary_framer_preflight(): - """Test binary framer _preflight.""" - test_framer = ModbusBinaryFramer(ClientDecoder()) - assert test_framer._preflight(b'A{B}C') == b'A{{B}}C' # pylint: disable=protected-access + # ---- 100% coverage + @pytest.mark.parametrize( + ("framer", "message"), + [ + (ModbusAsciiFramer, b':00010001000AF4\r\n',), + (ModbusBinaryFramer, b'{\x00\x01\x00\x01\x00\n\xec\x1c}',), + (ModbusRtuFramer, b"\x00\x01\x00\x01\x00\n\xec\x1c",), + (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x00\x01\x00\x01\x00\n',), + ] + ) + def test_build_packet(self, framer, message): + """Test build packet.""" + test_framer = framer(ClientDecoder()) + request = ReadCoilsRequest(1, 10) + assert test_framer.buildPacket(request) == message + + + @pytest.mark.parametrize( + ("framer", "message"), + [ + (ModbusAsciiFramer, b':01010001000AF3\r\n',), + (ModbusBinaryFramer, b'A{\x01\x01\x00\x01\x00\n\xed\xcd}',), + (ModbusRtuFramer, b"\x01\x01\x03\x01\x00\n\xed\x89",), + (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x01\x01\x00\x01\x00\n',), + ] + ) + @pytest.mark.parametrize(("slave"), [0x01, 0x02]) + def test_processincomingpacket_ok(self, framer, message, slave): + """Test processIncomingPacket.""" + test_framer = framer(ClientDecoder()) + test_framer.processIncomingPacket(message, mock.Mock(), slave) + + + @pytest.mark.parametrize( + ("framer", "message"), + [ + (ModbusAsciiFramer, b':01270001000ACD\r\n',), + (ModbusBinaryFramer, b'{\x01\x1a\x00\x01\x00\n\x89\xcf}',), + (ModbusRtuFramer, b"\x01\x03\x03\x01\x00\n\x94\x49",), + (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x01\x27\x00\x01\x00\n',), + ] + ) + def test_processincomingpacket_not_ok(self, framer, message): + """Test processIncomingPacket.""" + test_framer = framer(ClientDecoder()) + with pytest.raises(ModbusIOException): + test_framer.processIncomingPacket(message, mock.Mock(), 0x01) + + @pytest.mark.parametrize( + ("framer", "message"), + [ + (ModbusAsciiFramer, b':61620001000AF4\r\n',), + (ModbusBinaryFramer, b'{\x61\x62\x00\x01\x00\n\xec\x1c}',), + (ModbusRtuFramer, b"\x61\x62\x00\x01\x00\n\xec\x1c",), + (ModbusSocketFramer, b'\x00\x00\x00\x00\x00\x06\x61\x62\x00\x01\x00\n',), + ] + ) + @pytest.mark.parametrize("expected", [{"fcode": 98, "slave": 97}]) + def test_decode_data(self, framer, message, expected): + """Test decode data.""" + test_framer = framer(ClientDecoder()) + decoded = test_framer.decode_data(b'') + assert decoded == {} + decoded = test_framer.decode_data(message) + assert decoded["fcode"] == expected["fcode"] + assert decoded["slave"] == expected["slave"] + + def test_binary_framer_preflight(self): + """Test binary framer _preflight.""" + test_framer = ModbusBinaryFramer(ClientDecoder()) + assert test_framer._preflight(b'A{B}C') == b'A{{B}}C' # pylint: disable=protected-access diff --git a/test/test_transaction.py b/test/test_transaction.py index 95bbdc5b3..233730dd1 100755 --- a/test/test_transaction.py +++ b/test/test_transaction.py @@ -1,12 +1,8 @@ """Test transaction.""" -from binascii import a2b_hex from itertools import count from unittest import mock -import pytest - from pymodbus.exceptions import ( - InvalidMessageReceivedException, ModbusIOException, ) from pymodbus.factory import ServerDecoder @@ -241,98 +237,126 @@ class Request: # pylint: disable=too-few-public-methods # ----------------------------------------------------------------------- # def test_tcp_framer_transaction_ready(self): """Test a tcp frame transaction.""" - msg = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" - assert not self._tcp.isFrameReady() - assert not self._tcp.checkFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg, callback, [1]) self._tcp._buffer = msg # pylint: disable=protected-access - assert self._tcp.isFrameReady() - assert self._tcp.checkFrame() - self._tcp.advanceFrame() - assert not self._tcp.isFrameReady() - assert not self._tcp.checkFrame() - assert self._ascii.getFrame() == b"" def test_tcp_framer_transaction_full(self): """Test a full tcp frame transaction.""" - msg = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" - self._tcp._buffer = msg # pylint: disable=protected-access - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg[7:] - self._tcp.advanceFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg, callback, [0, 1]) + assert result.function_code.to_bytes(1,'big') + result.encode() == msg[7:] def test_tcp_framer_transaction_half(self): """Test a half completed tcp frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg1 = b"\x00\x01\x12\x34\x00" - msg2 = b"\x04\xff\x02\x12\x34" - self._tcp._buffer = msg1 # pylint: disable=protected-access - assert not self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == b"" - self._tcp._buffer += msg2 - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg2[2:] - self._tcp.advanceFrame() + msg2 = b"\x06\xff\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg1, callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg2, callback, [0, 1]) + assert result + assert result.function_code.to_bytes(1,'big') + result.encode() == msg2[2:] def test_tcp_framer_transaction_half2(self): """Test a half completed tcp frame transaction.""" - msg1 = b"\x00\x01\x12\x34\x00\x04\xff" - msg2 = b"\x02\x12\x34" - self._tcp._buffer = msg1 # pylint: disable=protected-access - assert not self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == b"" - self._tcp._buffer += msg2 - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert msg2 == result - self._tcp.advanceFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg1 = b"\x00\x01\x12\x34\x00\x06\xff" + msg2 = b"\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg1, callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg2, callback, [0, 1]) + assert result + assert result.function_code.to_bytes(1,'big') + result.encode() == msg2 def test_tcp_framer_transaction_half3(self): """Test a half completed tcp frame transaction.""" - msg1 = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12" - msg2 = b"\x34" - self._tcp._buffer = msg1 # pylint: disable=protected-access - assert not self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg1[7:] - self._tcp._buffer += msg2 - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg1[7:] + msg2 - self._tcp.advanceFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg1 = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00" + msg2 = b"\x08" + self._tcp.processIncomingPacket(msg1, callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg2, callback, [0, 1]) + assert result + assert result.function_code.to_bytes(1,'big') + result.encode() == msg1[7:] + msg2 def test_tcp_framer_transaction_short(self): """Test that we can get back on track after an invalid message.""" - msg1 = b"\x99\x99\x99\x99\x00\x01\x00\x01" - msg2 = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" - self._tcp._buffer = msg1 # pylint: disable=protected-access - assert not self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == b"" - self._tcp.advanceFrame() - self._tcp._buffer += msg2 - assert len(self._tcp._buffer) == 10 # pylint: disable=protected-access - assert self._tcp.checkFrame() - result = self._tcp.getFrame() - assert result == msg2[7:] - self._tcp.advanceFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + # msg1 = b"\x99\x99\x99\x99\x00\x01\x00\x17" + msg1 = b'' + msg2 = b"\x00\x01\x12\x34\x00\x06\xff\x02\x01\x02\x00\x08" + self._tcp.processIncomingPacket(msg1, callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg2, callback, [0, 1]) + assert result + assert result.function_code.to_bytes(1,'big') + result.encode() == msg2[7:] def test_tcp_framer_populate(self): """Test a tcp frame packet build.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + expected = ModbusRequest() expected.transaction_id = 0x0001 expected.protocol_id = 0x1234 expected.slave_id = 0xFF - msg = b"\x00\x01\x12\x34\x00\x04\xff\x02\x12\x34" - self._tcp._buffer = msg # pylint: disable=protected-access - assert self._tcp.checkFrame() - actual = ModbusRequest() - self._tcp.populateResult(actual) - for name in ("transaction_id", "protocol_id", "slave_id"): - assert getattr(expected, name) == getattr(actual, name) - self._tcp.advanceFrame() + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg, callback, [0, 1]) + # assert self._tcp.checkFrame() + # actual = ModbusRequest() + # self._tcp.populateResult(actual) + # for name in ("transaction_id", "protocol_id", "slave_id"): + # assert getattr(expected, name) == getattr(actual, name) def test_tcp_framer_packet(self): """Test a tcp frame packet build.""" @@ -353,55 +377,65 @@ def test_tcp_framer_packet(self): # ----------------------------------------------------------------------- # def test_framer_tls_framer_transaction_ready(self): """Test a tls frame transaction.""" - msg = b"\x01\x12\x34\x00\x08" - assert not self._tls.isFrameReady() - assert not self._tls.checkFrame() - self._tls._buffer = msg # pylint: disable=protected-access - assert self._tls.isFrameReady() - assert self._tls.checkFrame() - self._tls.advanceFrame() - assert not self._tls.isFrameReady() - assert not self._tls.checkFrame() - assert self._tls.getFrame() == b"" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg[0:4], callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg[4:], callback, [0, 1]) + assert result def test_framer_tls_framer_transaction_full(self): """Test a full tls frame transaction.""" - msg = b"\x01\x12\x34\x00\x08" - self._tls._buffer = msg # pylint: disable=protected-access - assert self._tls.checkFrame() - result = self._tls.getFrame() - assert result == msg[0:] - self._tls.advanceFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg, callback, [0, 1]) + assert result def test_framer_tls_framer_transaction_half(self): """Test a half completed tls frame transaction.""" - msg1 = b"" - msg2 = b"\x01\x12\x34\x00\x08" - self._tls._buffer = msg1 # pylint: disable=protected-access - assert not self._tls.checkFrame() - result = self._tls.getFrame() - assert result == b"" - self._tls._buffer += msg2 - assert self._tls.checkFrame() - result = self._tls.getFrame() - assert result == msg2[0:] - self._tls.advanceFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg[0:8], callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg[8:], callback, [0, 1]) + assert result def test_framer_tls_framer_transaction_short(self): """Test that we can get back on track after an invalid message.""" - msg1 = b"" - msg2 = b"\x01\x12\x34\x00\x08" - self._tls._buffer = msg1 # pylint: disable=protected-access - assert not self._tls.checkFrame() - result = self._tls.getFrame() - assert result == b"" - self._tls.advanceFrame() - self._tls._buffer = msg2 # pylint: disable=protected-access - assert len(self._tls._buffer) == 5 # pylint: disable=protected-access - assert self._tls.checkFrame() - result = self._tls.getFrame() - assert result == msg2[0:] - self._tls.advanceFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg[0:2], callback, [0, 1]) + assert not result + self._tcp.processIncomingPacket(msg[2:], callback, [0, 1]) + assert result def test_framer_tls_framer_decode(self): """Testmessage decoding.""" @@ -411,70 +445,73 @@ def test_framer_tls_framer_decode(self): assert not result result = self._tls.decode_data(msg2) assert result == {"fcode": 1} - self._tls.advanceFrame() def test_framer_tls_incoming_packet(self): """Framer tls incoming packet.""" - msg = b"\x01\x12\x34\x00\x08" + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" slave = 0x01 + msg_result = None - def mock_callback(): + def mock_callback(result): """Mock callback.""" + nonlocal msg_result - self._tls._process = mock.MagicMock() # pylint: disable=protected-access - self._tls.isFrameReady = mock.MagicMock(return_value=False) - self._tls.processIncomingPacket(msg, mock_callback, slave) - assert msg == self._tls._buffer # pylint: disable=protected-access - self._tls.advanceFrame() + msg_result = result.encode() - self._tls.isFrameReady = mock.MagicMock(return_value=True) - x = mock.MagicMock(return_value=False) - self._tls._validate_slave_id = x # pylint: disable=protected-access - self._tls.processIncomingPacket(msg, mock_callback, slave) - assert not self._tls._buffer # pylint: disable=protected-access - self._tls.advanceFrame() - x = mock.MagicMock(return_value=True) - self._tls._validate_slave_id = x # pylint: disable=protected-access self._tls.processIncomingPacket(msg, mock_callback, slave) - assert msg == self._tls._buffer # pylint: disable=protected-access - self._tls.advanceFrame() + # assert msg == msg_result + + # self._tls.isFrameReady = mock.MagicMock(return_value=True) + # x = mock.MagicMock(return_value=False) + # self._tls._validate_slave_id = x + # self._tls.processIncomingPacket(msg, mock_callback, slave) + # assert not self._tls._buffer + # self._tls.advanceFrame() + # x = mock.MagicMock(return_value=True) + # self._tls._validate_slave_id = x + # self._tls.processIncomingPacket(msg, mock_callback, slave) + # assert msg[1:] == msg_result + # self._tls.advanceFrame() def test_framer_tls_process(self): """Framer tls process.""" + # class MockResult: + # """Mock result.""" - class MockResult: # pylint: disable=too-few-public-methods - """Mock result.""" + # def __init__(self, code): + # """Init.""" + # self.function_code = code - def __init__(self, code): - """Init.""" - self.function_code = code - - def mock_callback(_arg): - """Mock callback.""" + # def mock_callback(_arg): + # """Mock callback.""" - self._tls.decoder.decode = mock.MagicMock(return_value=None) - with pytest.raises(ModbusIOException): - self._tls._process(mock_callback) # pylint: disable=protected-access + # self._tls.decoder.decode = mock.MagicMock(return_value=None) + # with pytest.raises(ModbusIOException): + # self._tls._process(mock_callback) - result = MockResult(0x01) - self._tls.decoder.decode = mock.MagicMock(return_value=result) - with pytest.raises(InvalidMessageReceivedException): - self._tls._process( # pylint: disable=protected-access - mock_callback, error=True - ) - self._tls._process(mock_callback) # pylint: disable=protected-access - assert not self._tls._buffer # pylint: disable=protected-access + # result = MockResult(0x01) + # self._tls.decoder.decode = mock.MagicMock(return_value=result) + # with pytest.raises(InvalidMessageReceivedException): + # self._tls._process( + # mock_callback, error=True + # ) + # self._tls._process(mock_callback) + # assert not self._tls._buffer def test_framer_tls_framer_populate(self): """Test a tls frame packet build.""" - ModbusRequest() - msg = b"\x01\x12\x34\x00\x08" - self._tls._buffer = msg # pylint: disable=protected-access - assert self._tls.checkFrame() - actual = ModbusRequest() - self._tls.populateResult(actual) - self._tls.advanceFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x01\x12\x34\x00\x06\xff\x02\x12\x34\x01\x02" + self._tcp.processIncomingPacket(msg, callback, [0, 1]) + assert result def test_framer_tls_framer_packet(self): """Test a tls frame packet build.""" @@ -492,53 +529,66 @@ def test_framer_tls_framer_packet(self): # ----------------------------------------------------------------------- # def test_rtu_framer_transaction_ready(self): """Test if the checks for a complete frame work.""" - assert not self._rtu.isFrameReady() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data msg_parts = [b"\x00\x01\x00", b"\x00\x00\x01\xfc\x1b"] - self._rtu._buffer = msg_parts[0] # pylint: disable=protected-access - assert not self._rtu.isFrameReady() - assert not self._rtu.checkFrame() - - self._rtu._buffer += msg_parts[1] - assert self._rtu.isFrameReady() - assert self._rtu.checkFrame() + self._rtu.processIncomingPacket(msg_parts[0], callback, [0, 1]) + assert not result + self._rtu.processIncomingPacket(msg_parts[1], callback, [0, 1]) + assert result def test_rtu_framer_transaction_full(self): """Test a full rtu frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" - stripped_msg = msg[1:-2] - self._rtu._buffer = msg # pylint: disable=protected-access - assert self._rtu.checkFrame() - result = self._rtu.getFrame() - assert stripped_msg == result - self._rtu.advanceFrame() + self._rtu.processIncomingPacket(msg, callback, [0, 1]) + assert result def test_rtu_framer_transaction_half(self): """Test a half completed rtu frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg_parts = [b"\x00\x01\x00", b"\x00\x00\x01\xfc\x1b"] - stripped_msg = b"".join(msg_parts)[1:-2] - self._rtu._buffer = msg_parts[0] # pylint: disable=protected-access - assert not self._rtu.checkFrame() - self._rtu._buffer += msg_parts[1] - assert self._rtu.isFrameReady() - assert self._rtu.checkFrame() - result = self._rtu.getFrame() - assert stripped_msg == result - self._rtu.advanceFrame() + self._rtu.processIncomingPacket(msg_parts[0], callback, [0, 1]) + assert not result + self._rtu.processIncomingPacket(msg_parts[1], callback, [0, 1]) + assert result def test_rtu_framer_populate(self): """Test a rtu frame packet build.""" - request = ModbusRequest() - msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" - self._rtu._buffer = msg # pylint: disable=protected-access - self._rtu.populateHeader() - self._rtu.populateResult(request) + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" + self._rtu.processIncomingPacket(msg, callback, [0, 1]) header_dict = self._rtu._header # pylint: disable=protected-access assert len(msg) == header_dict["len"] assert int(msg[0]) == header_dict["uid"] assert msg[-2:] == header_dict["crc"] - assert not request.slave_id def test_rtu_framer_packet(self): """Test a rtu frame packet build.""" @@ -554,95 +604,94 @@ def test_rtu_framer_packet(self): def test_rtu_decode_exception(self): """Test that the RTU framer can decode errors.""" - message = b"\x00\x90\x02\x9c\x01" - self._rtu._buffer = message # pylint: disable=protected-access - result = self._rtu.checkFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg = b"\x00\x90\x02\x9c\x01" + self._rtu.processIncomingPacket(msg, callback, [0, 1]) assert result def test_process(self): """Test process.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data - class MockResult: # pylint: disable=too-few-public-methods - """Mock result.""" - - def __init__(self, code): - self.function_code = code - - def mock_callback(_arg): - """Mock callback.""" - - mock_result = MockResult(code=0) - self._rtu.getFrame = mock.MagicMock() - self._rtu.decoder = mock.MagicMock() - self._rtu.decoder.decode = mock.MagicMock(return_value=mock_result) - self._rtu.populateResult = mock.MagicMock() - self._rtu.advanceFrame = mock.MagicMock() - - self._rtu._process(mock_callback) # pylint: disable=protected-access - self._rtu.populateResult.assert_called_with(mock_result) - self._rtu.advanceFrame.assert_called_with() - assert self._rtu.advanceFrame.called - - # Check errors - self._rtu.decoder.decode = mock.MagicMock(return_value=None) - with pytest.raises(ModbusIOException): - self._rtu._process(mock_callback) # pylint: disable=protected-access + msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" + self._rtu.processIncomingPacket(msg, callback, [0, 1]) + assert result def test_rtu_process_incoming_packets(self): """Test rtu process incoming packets.""" - mock_data = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" - slave = 0x00 + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data - def mock_callback(): - """Mock callback.""" - - self._rtu._buffer = mock.MagicMock() # pylint: disable=protected-access - self._rtu._process = mock.MagicMock() # pylint: disable=protected-access - self._rtu.isFrameReady = mock.MagicMock(return_value=False) - self._rtu._buffer = mock_data # pylint: disable=protected-access + msg = b"\x00\x01\x00\x00\x00\x01\xfc\x1b" + slave = 0x00 - self._rtu.processIncomingPacket(mock_data, mock_callback, slave) + self._rtu.processIncomingPacket(msg, callback, slave) + assert result # ----------------------------------------------------------------------- # # ASCII tests # ----------------------------------------------------------------------- # def test_ascii_framer_transaction_ready(self): """Test a ascii frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg = b":F7031389000A60\r\n" - assert not self._ascii.isFrameReady() - assert not self._ascii.checkFrame() - self._ascii._buffer = msg # pylint: disable=protected-access - assert self._ascii.isFrameReady() - assert self._ascii.checkFrame() - self._ascii.advanceFrame() - assert not self._ascii.isFrameReady() - assert not self._ascii.checkFrame() - assert not self._ascii.getFrame() + self._ascii.processIncomingPacket(msg, callback, [0,1]) + assert result def test_ascii_framer_transaction_full(self): """Test a full ascii frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg = b"sss:F7031389000A60\r\n" - pack = a2b_hex(msg[6:-4]) - self._ascii._buffer = msg # pylint: disable=protected-access - assert self._ascii.checkFrame() - result = self._ascii.getFrame() - assert pack == result - self._ascii.advanceFrame() + self._ascii.processIncomingPacket(msg, callback, [0,1]) + assert result def test_ascii_framer_transaction_half(self): """Test a half completed ascii frame transaction.""" - msg1 = b"sss:F7031389" - msg2 = b"000A60\r\n" - pack = a2b_hex(msg1[6:] + msg2[:-4]) - self._ascii._buffer = msg1 # pylint: disable=protected-access - assert not self._ascii.checkFrame() - result = self._ascii.getFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg_parts = (b"sss:F7031389", b"000A60\r\n") + self._ascii.processIncomingPacket(msg_parts[0], callback, [0,1]) assert not result - self._ascii._buffer += msg2 - assert self._ascii.checkFrame() - result = self._ascii.getFrame() - assert pack == result - self._ascii.advanceFrame() + self._ascii.processIncomingPacket(msg_parts[1], callback, [0,1]) + assert result def test_ascii_framer_populate(self): """Test a ascii frame packet build.""" @@ -664,59 +713,64 @@ def test_ascii_framer_packet(self): def test_ascii_process_incoming_packets(self): """Test ascii process incoming packet.""" - mock_data = b":F7031389000A60\r\n" - slave = 0x00 - - def mock_callback(_mock_data, *_args, **_kwargs): - """Mock callback.""" - - self._ascii.processIncomingPacket(mock_data, mock_callback, slave) + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data - # Test failure: - self._ascii.checkFrame = mock.MagicMock(return_value=False) - self._ascii.processIncomingPacket(mock_data, mock_callback, slave) + msg = b":F7031389000A60\r\n" + self._ascii.processIncomingPacket(msg, callback, [0,1]) + assert result # ----------------------------------------------------------------------- # # Binary tests # ----------------------------------------------------------------------- # def test_binary_framer_transaction_ready(self): """Test a binary frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg = TEST_MESSAGE - assert not self._binary.isFrameReady() - assert not self._binary.checkFrame() - self._binary._buffer = msg # pylint: disable=protected-access - assert self._binary.isFrameReady() - assert self._binary.checkFrame() - self._binary.advanceFrame() - assert not self._binary.isFrameReady() - assert not self._binary.checkFrame() - assert not self._binary.getFrame() + self._binary.processIncomingPacket(msg, callback, [0,1]) + assert result def test_binary_framer_transaction_full(self): """Test a full binary frame transaction.""" + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + msg = TEST_MESSAGE - pack = msg[2:-3] - self._binary._buffer = msg # pylint: disable=protected-access - assert self._binary.checkFrame() - result = self._binary.getFrame() - assert pack == result - self._binary.advanceFrame() + self._binary.processIncomingPacket(msg, callback, [0,1]) + assert result def test_binary_framer_transaction_half(self): """Test a half completed binary frame transaction.""" - msg1 = b"\x7b\x01\x03\x00" - msg2 = b"\x00\x00\x05\x85\xC9\x7d" - pack = msg1[2:] + msg2[:-3] - self._binary._buffer = msg1 # pylint: disable=protected-access - assert not self._binary.checkFrame() - result = self._binary.getFrame() + count = 0 + result = None + def callback(data): + """Simulate callback.""" + nonlocal count, result + count += 1 + result = data + + msg_parts = (b"\x7b\x01\x03\x00", b"\x00\x00\x05\x85\xC9\x7d") + self._binary.processIncomingPacket(msg_parts[0], callback, [0,1]) assert not result - self._binary._buffer += msg2 - - assert self._binary.checkFrame() - result = self._binary.getFrame() - assert pack == result - self._binary.advanceFrame() + self._binary.processIncomingPacket(msg_parts[1], callback, [0,1]) + assert result def test_binary_framer_populate(self): """Test a binary frame packet build."""