Skip to content

Commit 402d97d

Browse files
authored
remove second client instance in async mode. (#1367)
* merge ModbusClientProtocol to ModbusBaseClient.
1 parent 5d7df32 commit 402d97d

File tree

11 files changed

+250
-358
lines changed

11 files changed

+250
-358
lines changed

API_changes.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ Version 3.2.0
1010
- `ReturnSlaveNoReponseCountResponse` has been corrected to
1111
`ReturnSlaveNoResponseCountResponse`
1212
- Option `--modbus-config` for REPL server renamed to `--modbus-config-path`
13+
- client.protocol.<something> --> client.<something>
14+
- client.factory.<something> --> client.<something>
1315

1416
-------------
1517
Version 3.1.0

examples/client_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ async def run_async_client(client, modbus_calls=None):
119119
"""Run sync client."""
120120
_logger.info("### Client starting")
121121
await client.connect()
122-
assert client.protocol
122+
assert client.connected
123123
if modbus_calls:
124124
await modbus_calls(client)
125125
await client.close()

pymodbus/client/base.py

Lines changed: 119 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,6 @@ def run():
6363
**Application methods, common to all clients**:
6464
"""
6565

66-
state = ModbusTransactionState.IDLE
67-
last_frame_end: float = 0
68-
silent_interval: float = 0
69-
7066
@dataclass
7167
class _params: # pylint: disable=too-many-instance-attributes
7268
"""Parameter class."""
@@ -125,13 +121,16 @@ def __init__(
125121
self.params.kwargs = kwargs
126122

127123
# Common variables.
128-
if xframer := kwargs.get("xframer", None):
129-
self.framer = xframer
130-
else:
131-
self.framer = self.params.framer(ClientDecoder(), self)
124+
self.framer = self.params.framer(ClientDecoder(), self)
132125
self.transaction = DictTransactionManager(self, **kwargs)
133126
self.delay_ms = self.params.reconnect_delay
134-
self.use_protocol = hasattr(self, "protocol")
127+
self.use_protocol = False
128+
self._connected = False
129+
self.use_udp = False
130+
self.state = ModbusTransactionState.IDLE
131+
self.last_frame_end: float = 0
132+
self.silent_interval: float = 0
133+
self.transport = None
135134

136135
# Initialize mixin
137136
super().__init__()
@@ -185,9 +184,9 @@ def execute(self, request: ModbusRequest = None) -> ModbusResponse:
185184
:raises ConnectionException: Check exception text.
186185
"""
187186
if self.use_protocol:
188-
if not self.protocol:
187+
if not self._connected:
189188
raise ConnectionException(f"Not connected[{str(self)}]")
190-
return self.protocol.execute(request)
189+
return self.async_execute(request)
191190
if not self.connect():
192191
raise ConnectionException(f"Failed to connect[{str(self)}]")
193192
return self.transaction.execute(request)
@@ -196,6 +195,115 @@ def close(self) -> None:
196195
"""Close the underlying socket connection (call **sync/async**)."""
197196
raise NotImplementedException
198197

198+
# ----------------------------------------------------------------------- #
199+
# Merged client methods
200+
# ----------------------------------------------------------------------- #
201+
def client_made_connection(self, protocol):
202+
"""Run transport specific connection."""
203+
204+
def client_lost_connection(self, protocol):
205+
"""Run transport specific connection lost."""
206+
207+
def datagram_received(self, data, _addr):
208+
"""Receive datagram."""
209+
self.data_received(data)
210+
211+
async def async_execute(self, request=None):
212+
"""Execute requests asynchronously."""
213+
request.transaction_id = self.transaction.getNextTID()
214+
packet = self.framer.buildPacket(request)
215+
Log.debug("send: {}", packet, ":hex")
216+
if self.use_udp:
217+
self.transport.sendto(packet)
218+
else:
219+
self.transport.write(packet)
220+
req = self._build_response(request.transaction_id)
221+
if self.params.broadcast_enable and not request.unit_id:
222+
resp = b"Broadcast write sent - no response expected"
223+
else:
224+
try:
225+
resp = await asyncio.wait_for(req, timeout=self.params.timeout)
226+
except asyncio.exceptions.TimeoutError:
227+
self.connection_lost("trying to send")
228+
raise
229+
return resp
230+
231+
def connection_made(self, transport):
232+
"""Call when a connection is made.
233+
234+
The transport argument is the transport representing the connection.
235+
"""
236+
self.transport = transport
237+
Log.debug("Client connected to modbus server")
238+
self._connected = True
239+
self.client_made_connection(self)
240+
241+
def connection_lost(self, reason):
242+
"""Call when the connection is lost or closed.
243+
244+
The argument is either an exception object or None
245+
"""
246+
if self.transport:
247+
self.transport.abort()
248+
if hasattr(self.transport, "_sock"):
249+
self.transport._sock.close() # pylint: disable=protected-access
250+
self.transport = None
251+
self.client_lost_connection(self)
252+
Log.debug("Client disconnected from modbus server: {}", reason)
253+
self._connected = False
254+
for tid in list(self.transaction):
255+
self.raise_future(
256+
self.transaction.getTransaction(tid),
257+
ConnectionException("Connection lost during request"),
258+
)
259+
260+
def data_received(self, data):
261+
"""Call when some data is received.
262+
263+
data is a non-empty bytes object containing the incoming data.
264+
"""
265+
Log.debug("recv: {}", data, ":hex")
266+
self.framer.processIncomingPacket(data, self._handle_response, unit=0)
267+
268+
def create_future(self):
269+
"""Help function to create asyncio Future object."""
270+
return asyncio.Future()
271+
272+
def raise_future(self, my_future, exc):
273+
"""Set exception of a future if not done."""
274+
if not my_future.done():
275+
my_future.set_exception(exc)
276+
277+
def _handle_response(self, reply, **_kwargs):
278+
"""Handle the processed response and link to correct deferred."""
279+
if reply is not None:
280+
tid = reply.transaction_id
281+
if handler := self.transaction.getTransaction(tid):
282+
if not handler.done():
283+
handler.set_result(reply)
284+
else:
285+
Log.debug("Unrequested message: {}", reply, ":str")
286+
287+
def _build_response(self, tid):
288+
"""Return a deferred response for the current request."""
289+
my_future = self.create_future()
290+
if not self._connected:
291+
self.raise_future(my_future, ConnectionException("Client is not connected"))
292+
else:
293+
self.transaction.addTransaction(my_future, tid)
294+
return my_future
295+
296+
@property
297+
def async_connected(self):
298+
"""Return connection status."""
299+
return self._connected
300+
301+
async def async_close(self):
302+
"""Close connection."""
303+
if self.transport:
304+
self.transport.close()
305+
self._connected = False
306+
199307
# ----------------------------------------------------------------------- #
200308
# Internal methods
201309
# ----------------------------------------------------------------------- #
@@ -263,153 +371,3 @@ def __str__(self):
263371
:returns: The string representation
264372
"""
265373
return f"{self.__class__.__name__} {self.params.host}:{self.params.port}"
266-
267-
268-
class ModbusClientProtocol(
269-
ModbusBaseClient,
270-
asyncio.Protocol,
271-
asyncio.DatagramProtocol,
272-
):
273-
"""Asyncio specific implementation of asynchronous modbus client protocol."""
274-
275-
#: Factory that created this instance.
276-
factory = None
277-
transport = None
278-
279-
def __init__(
280-
self, host="127.0.0.1", port=502, source_address=None, use_udp=False, **kwargs
281-
):
282-
"""Initialize a Modbus TCP/UDP asynchronous client"""
283-
super().__init__(**kwargs)
284-
self.use_udp = use_udp
285-
self.params.host = host
286-
self.params.port = port
287-
self.params.source_address = source_address or ("", 0)
288-
289-
self._connected = False
290-
291-
def datagram_received(self, data, addr):
292-
"""Receive datagram."""
293-
self._data_received(data)
294-
295-
async def execute(self, request=None): # pylint: disable=invalid-overridden-method
296-
"""Execute requests asynchronously."""
297-
req = self._execute(request)
298-
if self.params.broadcast_enable and not request.unit_id:
299-
resp = b"Broadcast write sent - no response expected"
300-
else:
301-
try:
302-
resp = await asyncio.wait_for(req, timeout=self.params.timeout)
303-
except asyncio.exceptions.TimeoutError:
304-
self.connection_lost("trying to send")
305-
raise
306-
return resp
307-
308-
def connection_made(self, transport):
309-
"""Call when a connection is made.
310-
311-
The transport argument is the transport representing the connection.
312-
"""
313-
self.transport = transport
314-
self._connection_made()
315-
316-
if self.factory:
317-
self.factory.protocol_made_connection(self) # pylint: disable=no-member
318-
319-
async def close(self): # pylint: disable=invalid-overridden-method
320-
"""Close connection."""
321-
if self.transport:
322-
self.transport.close()
323-
self._connected = False
324-
325-
def connection_lost(self, reason):
326-
"""Call when the connection is lost or closed.
327-
328-
The argument is either an exception object or None
329-
"""
330-
if self.transport:
331-
self.transport.abort()
332-
if hasattr(self.transport, "_sock"):
333-
self.transport._sock.close() # pylint: disable=protected-access
334-
self.transport = None
335-
if self.factory:
336-
self.factory.protocol_lost_connection(self) # pylint: disable=no-member
337-
self._connection_lost(reason)
338-
339-
def data_received(self, data):
340-
"""Call when some data is received.
341-
342-
data is a non-empty bytes object containing the incoming data.
343-
"""
344-
self._data_received(data)
345-
346-
def create_future(self):
347-
"""Help function to create asyncio Future object."""
348-
return asyncio.Future()
349-
350-
def resolve_future(self, my_future, result):
351-
"""Resolve the completed future and sets the result."""
352-
if not my_future.done():
353-
my_future.set_result(result)
354-
355-
def raise_future(self, my_future, exc):
356-
"""Set exception of a future if not done."""
357-
if not my_future.done():
358-
my_future.set_exception(exc)
359-
360-
def _connection_made(self):
361-
"""Call upon a successful client connection."""
362-
Log.debug("Client connected to modbus server")
363-
self._connected = True
364-
365-
def _connection_lost(self, reason):
366-
"""Call upon a client disconnect."""
367-
Log.debug("Client disconnected from modbus server: {}", reason)
368-
self._connected = False
369-
for tid in list(self.transaction):
370-
self.raise_future(
371-
self.transaction.getTransaction(tid),
372-
ConnectionException("Connection lost during request"),
373-
)
374-
375-
@property
376-
def connected(self):
377-
"""Return connection status."""
378-
return self._connected
379-
380-
def write_transport(self, packet):
381-
"""Write transport."""
382-
if self.use_udp:
383-
return self.transport.sendto(packet)
384-
return self.transport.write(packet)
385-
386-
def _execute(self, request, **kwargs): # pylint: disable=unused-argument
387-
"""Start the producer to send the next request to consumer.write(Frame(request))."""
388-
request.transaction_id = self.transaction.getNextTID()
389-
packet = self.framer.buildPacket(request)
390-
Log.debug("send: {}", packet, ":hex")
391-
self.write_transport(packet)
392-
return self._build_response(request.transaction_id)
393-
394-
def _data_received(self, data):
395-
"""Get response, check for valid message, decode result."""
396-
Log.debug("recv: {}", data, ":hex")
397-
self.framer.processIncomingPacket(data, self._handle_response, unit=0)
398-
399-
def _handle_response(self, reply, **kwargs): # pylint: disable=unused-argument
400-
"""Handle the processed response and link to correct deferred."""
401-
if reply is not None:
402-
tid = reply.transaction_id
403-
if handler := self.transaction.getTransaction(tid):
404-
self.resolve_future(handler, reply)
405-
else:
406-
Log.debug("Unrequested message: {}", reply, ":str")
407-
408-
def _build_response(self, tid):
409-
"""Return a deferred response for the current request."""
410-
my_future = self.create_future()
411-
if not self._connected:
412-
self.raise_future(my_future, ConnectionException("Client is not connected"))
413-
else:
414-
self.transaction.addTransaction(my_future, tid)
415-
return my_future

0 commit comments

Comments
 (0)