Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 26 additions & 29 deletions pymodbus/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"BinaryPayloadDecoder",
]

from array import array

# pylint: disable=missing-type-doc
from struct import pack, unpack

Expand All @@ -23,9 +25,6 @@
)


WC = {"b": 1, "h": 2, "e": 2, "i": 4, "l": 4, "q": 8, "f": 4, "d": 8}


class BinaryPayloadBuilder:
"""A utility that helps build payload messages to be written with the various modbus messages.

Expand Down Expand Up @@ -69,15 +68,14 @@ def _pack_words(self, fstring: str, value) -> bytes:
:return:
"""
value = pack(f"!{fstring}", value)
wordorder = WC.get(fstring.lower()) // 2 # type: ignore[operator]
upperbyte = f"!{wordorder}H"
payload = unpack(upperbyte, value)

if self._wordorder == Endian.LITTLE:
payload = payload[::-1]

fstring = self._byteorder + "H"
return b"".join(pack(fstring, word) for word in payload)
if Endian.LITTLE in {self._byteorder, self._wordorder}:
value = array("H", value)
if self._byteorder == Endian.LITTLE:
value.byteswap()
if self._wordorder == Endian.LITTLE:
value.reverse()
value = value.tobytes()
return value

def encode(self) -> bytes:
"""Get the payload buffer encoded in bytes."""
Expand Down Expand Up @@ -295,7 +293,7 @@ def fromRegisters(
"""
Log.debug("{}", registers)
if isinstance(registers, list): # repack into flat binary
payload = b"".join(pack("!H", x) for x in registers)
payload = pack(f"!{len(registers)}H", *registers)
return cls(payload, byteorder, wordorder)
raise ParameterException("Invalid collection of registers supplied")

Expand Down Expand Up @@ -324,7 +322,7 @@ def fromCoils(
return cls(payload, byteorder)
raise ParameterException("Invalid collection of coils supplied")

def _unpack_words(self, fstring: str, handle) -> bytes:
def _unpack_words(self, handle) -> bytes:
"""Unpack words based on the word order and byte order.

# ---------------------------------------------- #
Expand All @@ -336,15 +334,14 @@ def _unpack_words(self, fstring: str, handle) -> bytes:
:param handle: Value to be unpacked
:return:
"""
wc_value = WC.get(fstring.lower()) // 2 # type: ignore[operator]
handle = unpack(f"!{wc_value}H", handle)
if self._wordorder == Endian.LITTLE:
handle = list(reversed(handle))

# Repack as unsigned Integer
handle = [pack(self._byteorder + "H", p) for p in handle]
if Endian.LITTLE in {self._byteorder, self._wordorder}:
handle = array("H", handle)
if self._byteorder == Endian.LITTLE:
handle.byteswap()
if self._wordorder == Endian.LITTLE:
handle.reverse()
handle = handle.tobytes()
Log.debug("handle: {}", handle)
handle = b"".join(handle)
return handle

def reset(self):
Expand Down Expand Up @@ -377,15 +374,15 @@ def decode_32bit_uint(self):
self._pointer += 4
fstring = "I"
handle = self._payload[self._pointer - 4 : self._pointer]
handle = self._unpack_words(fstring, handle)
handle = self._unpack_words(handle)
return unpack("!" + fstring, handle)[0]

def decode_64bit_uint(self):
"""Decode a 64 bit unsigned int from the buffer."""
self._pointer += 8
fstring = "Q"
handle = self._payload[self._pointer - 8 : self._pointer]
handle = self._unpack_words(fstring, handle)
handle = self._unpack_words(handle)
return unpack("!" + fstring, handle)[0]

def decode_8bit_int(self):
Expand All @@ -407,39 +404,39 @@ def decode_32bit_int(self):
self._pointer += 4
fstring = "i"
handle = self._payload[self._pointer - 4 : self._pointer]
handle = self._unpack_words(fstring, handle)
handle = self._unpack_words(handle)
return unpack("!" + fstring, handle)[0]

def decode_64bit_int(self):
"""Decode a 64 bit signed int from the buffer."""
self._pointer += 8
fstring = "q"
handle = self._payload[self._pointer - 8 : self._pointer]
handle = self._unpack_words(fstring, handle)
handle = self._unpack_words(handle)
return unpack("!" + fstring, handle)[0]

def decode_16bit_float(self):
"""Decode a 16 bit float from the buffer."""
self._pointer += 2
fstring = "e"
handle = self._payload[self._pointer - 2 : self._pointer]
handle = self._unpack_words(fstring, handle)
handle = self._unpack_words(handle)
return unpack("!" + fstring, handle)[0]

def decode_32bit_float(self):
"""Decode a 32 bit float from the buffer."""
self._pointer += 4
fstring = "f"
handle = self._payload[self._pointer - 4 : self._pointer]
handle = self._unpack_words(fstring, handle)
handle = self._unpack_words(handle)
return unpack("!" + fstring, handle)[0]

def decode_64bit_float(self):
"""Decode a 64 bit float(double) from the buffer."""
self._pointer += 8
fstring = "d"
handle = self._payload[self._pointer - 8 : self._pointer]
handle = self._unpack_words(fstring, handle)
handle = self._unpack_words(handle)
return unpack("!" + fstring, handle)[0]

def decode_string(self, size=1):
Expand Down