Skip to content

Commit 213498e

Browse files
committed
Refactor transaction handling to better separate async and sync code.
1 parent 5e146c0 commit 213498e

File tree

15 files changed

+176
-158
lines changed

15 files changed

+176
-158
lines changed

pymodbus/client/base.py

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import asyncio
55
import socket
6+
from abc import abstractmethod
67
from collections.abc import Awaitable, Callable
78
from dataclasses import dataclass
89
from typing import Any, cast
@@ -14,7 +15,7 @@
1415
from pymodbus.framer import FRAMER_NAME_TO_CLASS, FramerType, ModbusFramer
1516
from pymodbus.logging import Log
1617
from pymodbus.pdu import ModbusRequest, ModbusResponse
17-
from pymodbus.transaction import ModbusTransactionManager
18+
from pymodbus.transaction import SyncModbusTransactionManager
1819
from pymodbus.transport import CommParams
1920
from pymodbus.utilities import ModbusTransactionState
2021

@@ -53,7 +54,6 @@ def __init__(
5354
framer: FramerType,
5455
timeout: float = 3,
5556
retries: int = 3,
56-
retry_on_empty: bool = False,
5757
broadcast_enable: bool = False,
5858
reconnect_delay: float = 0.1,
5959
reconnect_delay_max: float = 300,
@@ -81,8 +81,6 @@ def __init__(
8181
stopbits=kwargs.get("stopbits", None),
8282
handle_local_echo=kwargs.get("handle_local_echo", False),
8383
),
84-
retries,
85-
retry_on_empty,
8684
on_connect_callback,
8785
)
8886
self.no_resend_on_retry = no_resend_on_retry
@@ -143,7 +141,7 @@ def idle_time(self) -> float:
143141
return 0
144142
return self.last_frame_end + self.silent_interval
145143

146-
def execute(self, request: ModbusRequest | None = None):
144+
def execute(self, request: ModbusRequest):
147145
"""Execute request and get response (call **sync/async**).
148146
149147
:param request: The request to process
@@ -165,7 +163,7 @@ async def async_execute(self, request) -> ModbusResponse:
165163
count = 0
166164
while count <= self.retries:
167165
async with self._lock:
168-
req = self.build_response(request.transaction_id)
166+
req = self.build_response(request)
169167
if not count or not self.no_resend_on_retry:
170168
self.ctx.framer.resetFrame()
171169
self.ctx.send(packet)
@@ -187,25 +185,17 @@ async def async_execute(self, request) -> ModbusResponse:
187185

188186
return resp # type: ignore[return-value]
189187

190-
def build_response(self, tid):
188+
def build_response(self, request: ModbusRequest):
191189
"""Return a deferred response for the current request."""
192190
my_future: asyncio.Future = asyncio.Future()
191+
request.fut = my_future
193192
if not self.ctx.transport:
194193
if not my_future.done():
195194
my_future.set_exception(ConnectionException("Client is not connected"))
196195
else:
197-
self.ctx.transaction.addTransaction(my_future, tid)
196+
self.ctx.transaction.addTransaction(request)
198197
return my_future
199198

200-
# ----------------------------------------------------------------------- #
201-
# Internal methods
202-
# ----------------------------------------------------------------------- #
203-
def recv(self, size):
204-
"""Receive data.
205-
206-
:meta private:
207-
"""
208-
209199
# ----------------------------------------------------------------------- #
210200
# The magic methods
211201
# ----------------------------------------------------------------------- #
@@ -309,10 +299,10 @@ def __init__(
309299
self.slaves: list[int] = []
310300

311301
# Common variables.
312-
self.framer = FRAMER_NAME_TO_CLASS.get(
302+
self.framer: ModbusFramer = FRAMER_NAME_TO_CLASS.get(
313303
framer, cast(type[ModbusFramer], framer)
314304
)(ClientDecoder(), self)
315-
self.transaction = ModbusTransactionManager(
305+
self.transaction = SyncModbusTransactionManager(
316306
self, retries=retries, retry_on_empty=retry_on_empty, **kwargs
317307
)
318308
self.reconnect_delay_current = self.params.reconnect_delay or 0
@@ -346,7 +336,7 @@ def idle_time(self) -> float:
346336
return 0
347337
return self.last_frame_end + self.silent_interval
348338

349-
def execute(self, request: ModbusRequest | None = None) -> ModbusResponse:
339+
def execute(self, request: ModbusRequest) -> ModbusResponse:
350340
"""Execute request and get response (call **sync/async**).
351341
352342
:param request: The request to process
@@ -360,22 +350,28 @@ def execute(self, request: ModbusRequest | None = None) -> ModbusResponse:
360350
# ----------------------------------------------------------------------- #
361351
# Internal methods
362352
# ----------------------------------------------------------------------- #
363-
def send(self, request):
353+
def _start_send(self):
364354
"""Send request.
365355
366356
:meta private:
367357
"""
368358
if self.state != ModbusTransactionState.RETRYING:
369359
Log.debug('New Transaction state "SENDING"')
370360
self.state = ModbusTransactionState.SENDING
371-
return request
372361

373-
def recv(self, size):
362+
@abstractmethod
363+
def send(self, request: bytes) -> int:
364+
"""Send request.
365+
366+
:meta private:
367+
"""
368+
369+
@abstractmethod
370+
def recv(self, size: int | None) -> bytes:
374371
"""Receive data.
375372
376373
:meta private:
377374
"""
378-
return size
379375

380376
@classmethod
381377
def get_address_family(cls, address):

pymodbus/client/modbusclientprotocol.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ def __init__(
2929
self,
3030
framer: FramerType,
3131
params: CommParams,
32-
retries: int,
33-
retry_on_empty: bool,
3432
on_connect_callback: Callable[[bool], None] | None = None,
3533
) -> None:
3634
"""Initialize a client instance."""
@@ -45,17 +43,16 @@ def __init__(
4543
self.framer = FRAMER_NAME_TO_CLASS.get(
4644
framer, cast(type[ModbusFramer], framer)
4745
)(ClientDecoder(), self)
48-
self.transaction = ModbusTransactionManager(
49-
self, retries=retries, retry_on_empty=retry_on_empty
50-
)
46+
self.transaction = ModbusTransactionManager()
5147

5248
def _handle_response(self, reply, **_kwargs):
5349
"""Handle the processed response and link to correct deferred."""
5450
if reply is not None:
5551
tid = reply.transaction_id
5652
if handler := self.transaction.getTransaction(tid):
57-
if not handler.done():
58-
handler.set_result(reply)
53+
reply.request = handler
54+
if not handler.fut.done():
55+
handler.fut.set_result(reply)
5956
else:
6057
Log.debug("Unrequested message: {}", reply, ":str")
6158

pymodbus/client/serial.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ def connected(self):
193193
"""Connect internal."""
194194
return self.connect()
195195

196-
def connect(self):
196+
def connect(self) -> bool:
197197
"""Connect to the modbus serial server."""
198198
if self.socket:
199199
return True
@@ -227,25 +227,26 @@ def _in_waiting(self):
227227
"""Return waiting bytes."""
228228
return getattr(self.socket, "in_waiting") if hasattr(self.socket, "in_waiting") else getattr(self.socket, "inWaiting")()
229229

230-
def send(self, request):
230+
def send(self, request: bytes) -> int:
231231
"""Send data on the underlying socket.
232232
233233
If receive buffer still holds some data then flush it.
234234
235235
Sleep if last send finished less than 3.5 character times ago.
236236
"""
237-
super().send(request)
237+
super()._start_send()
238238
if not self.socket:
239239
raise ConnectionException(str(self))
240240
if request:
241241
if waitingbytes := self._in_waiting():
242242
result = self.socket.read(waitingbytes)
243243
Log.warning("Cleanup recv buffer before send: {}", result, ":hex")
244-
size = self.socket.write(request)
244+
if (size := self.socket.write(request)) is None:
245+
size = 0
245246
return size
246247
return 0
247248

248-
def _wait_for_data(self):
249+
def _wait_for_data(self) -> int:
249250
"""Wait for data."""
250251
size = 0
251252
more_data = False
@@ -264,9 +265,8 @@ def _wait_for_data(self):
264265
time.sleep(self._recv_interval)
265266
return size
266267

267-
def recv(self, size):
268+
def recv(self, size: int | None) -> bytes:
268269
"""Read data from the underlying descriptor."""
269-
super().recv(size)
270270
if not self.socket:
271271
raise ConnectionException(str(self))
272272
if size is None:
@@ -276,7 +276,7 @@ def recv(self, size):
276276
result = self.socket.read(size)
277277
return result
278278

279-
def is_socket_open(self):
279+
def is_socket_open(self) -> bool:
280280
"""Check if socket is open."""
281281
if self.socket:
282282
return self.socket.is_open

pymodbus/client/tcp.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,16 +180,15 @@ def close(self):
180180

181181
def send(self, request):
182182
"""Send data on the underlying socket."""
183-
super().send(request)
183+
super()._start_send()
184184
if not self.socket:
185185
raise ConnectionException(str(self))
186186
if request:
187187
return self.socket.send(request)
188188
return 0
189189

190-
def recv(self, size):
190+
def recv(self, size: int | None) -> bytes:
191191
"""Read data from the underlying descriptor."""
192-
super().recv(size)
193192
if not self.socket:
194193
raise ConnectionException(str(self))
195194

@@ -241,7 +240,7 @@ def recv(self, size):
241240

242241
return b"".join(data)
243242

244-
def _handle_abrupt_socket_close(self, size, data, duration):
243+
def _handle_abrupt_socket_close(self, size: int | None, data: list[bytes], duration: float) -> bytes:
245244
"""Handle unexpected socket close by remote end.
246245
247246
Intended to be invoked after determining that the remote end
@@ -271,7 +270,7 @@ def _handle_abrupt_socket_close(self, size, data, duration):
271270
msg += " without response from slave before it closed connection"
272271
raise ConnectionException(msg)
273272

274-
def is_socket_open(self):
273+
def is_socket_open(self) -> bool:
275274
"""Check if socket is open."""
276275
return self.socket is not None
277276

pymodbus/client/udp.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,12 @@ def close(self):
169169
"""
170170
self.socket = None
171171

172-
def send(self, request):
172+
def send(self, request: bytes) -> int:
173173
"""Send data on the underlying socket.
174174
175175
:meta private:
176176
"""
177-
super().send(request)
177+
super()._start_send()
178178
if not self.socket:
179179
raise ConnectionException(str(self))
180180
if request:
@@ -183,14 +183,15 @@ def send(self, request):
183183
)
184184
return 0
185185

186-
def recv(self, size):
186+
def recv(self, size: int | None) -> bytes:
187187
"""Read data from the underlying descriptor.
188188
189189
:meta private:
190190
"""
191-
super().recv(size)
192191
if not self.socket:
193192
raise ConnectionException(str(self))
193+
if size is None:
194+
size = 0
194195
return self.socket.recvfrom(size)[0]
195196

196197
def is_socket_open(self):

pymodbus/framer/old_framer_base.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@
33
from __future__ import annotations
44

55
import time
6-
from typing import Any
6+
from typing import TYPE_CHECKING, Any
77

88
from pymodbus.factory import ClientDecoder, ServerDecoder
99
from pymodbus.framer.base import FramerBase
1010
from pymodbus.logging import Log
11+
from pymodbus.pdu import ModbusRequest
1112

1213

14+
if TYPE_CHECKING:
15+
from pymodbus.client.base import ModbusBaseSyncClient
16+
1317
# Unit ID, Function Code
1418
BYTE_ORDER = ">"
1519
FRAME_HEADER = "BB"
@@ -29,24 +33,28 @@ class ModbusFramer:
2933
def __init__(
3034
self,
3135
decoder: ClientDecoder | ServerDecoder,
32-
client,
36+
client: ModbusBaseSyncClient,
3337
) -> None:
3438
"""Initialize a new instance of the framer.
3539
3640
:param decoder: The decoder implementation to use
3741
"""
3842
self.decoder = decoder
3943
self.client = client
40-
self._header: dict[str, Any] = {
44+
self._header: dict[str, Any]
45+
self._reset_header()
46+
self._buffer = b""
47+
self.message_handler: FramerBase
48+
49+
def _reset_header(self) -> None:
50+
self._header = {
4151
"lrc": "0000",
4252
"len": 0,
4353
"uid": 0x00,
4454
"tid": 0,
4555
"pid": 0,
4656
"crc": b"\x00\x00",
4757
}
48-
self._buffer = b""
49-
self.message_handler: FramerBase
5058

5159
def _validate_slave_id(self, slaves: list, single: bool) -> bool:
5260
"""Validate if the received data is valid for the client.
@@ -63,7 +71,7 @@ def _validate_slave_id(self, slaves: list, single: bool) -> bool:
6371
return True
6472
return self._header["uid"] in slaves
6573

66-
def sendPacket(self, message):
74+
def sendPacket(self, message: bytes):
6775
"""Send packets on the bus.
6876
6977
With 3.5char delay between frames
@@ -72,7 +80,7 @@ def sendPacket(self, message):
7280
"""
7381
return self.client.send(message)
7482

75-
def recvPacket(self, size):
83+
def recvPacket(self, size: int) -> bytes:
7684
"""Receive packet from the bus.
7785
7886
With specified len
@@ -150,7 +158,7 @@ def frameProcessIncomingPacket(
150158
) -> None:
151159
"""Process new packet pattern."""
152160

153-
def buildPacket(self, message) -> bytes:
161+
def buildPacket(self, message: ModbusRequest) -> bytes:
154162
"""Create a ready to send modbus packet.
155163
156164
:param message: The populated request/response to send

pymodbus/framer/old_framer_rtu.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def check_frame(self):
146146
Log.debug("Frame check failed, ignoring!!")
147147
x = self._buffer
148148
self.resetFrame()
149-
self._buffer = x
149+
self._buffer: bytes = x
150150
skip_cur_frame = True
151151
continue
152152
start = self._hsize
@@ -176,7 +176,7 @@ def buildPacket(self, message):
176176
message.transaction_id = 0
177177
return packet
178178

179-
def sendPacket(self, message):
179+
def sendPacket(self, message: bytes) -> int:
180180
"""Send packets on the bus with 3.5char delay between frames.
181181
182182
:param message: Message to be sent over the bus

0 commit comments

Comments
 (0)