Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 0 additions & 131 deletions examples/client_test_tool.py

This file was deleted.

221 changes: 221 additions & 0 deletions examples/package_test_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
#!/usr/bin/env python3
"""Pymodbus client testing tool.

usage::

package_test_tool.py

This is a tool to test how a client react to responses from a malicious server using:

ClientTester

and to how the server react to requests using:

ServerTester

The tool is intended for users with advanced modbus protocol knowledge.

When testing a client the server is replaced by a stub and the nullmodem solution.

There are 4 functions which can be modified to test the client/server functionality.

*** client_calls(client) ***

Called when the client is connected.

The full client API is available, just as if it was a normal App using pymodbus

*** server_calls(transport) ***

Called when the server is listening and stub connected.

Send raw data packets to the server (remark data is frame+request)

*** handle_client_data(transport, data) ***

Called when data is received from the client/server (remark data is frame+request)

The function generates frame+response and sends it.

And one function which can be modified to test the server functionality:
"""
from __future__ import annotations

import asyncio
from typing import Callable

import pymodbus.client as modbusClient
import pymodbus.server as modbusServer
from pymodbus import Framer, pymodbus_apply_logging_config
from pymodbus.datastore import (
ModbusSequentialDataBlock,
ModbusServerContext,
ModbusSlaveContext,
)
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.logging import Log
from pymodbus.transport import NULLMODEM_HOST, CommParams, CommType, ModbusProtocol


class TransportStub(ModbusProtocol):
"""Protocol layer including transport."""

def __init__(
self,
params: CommParams,
is_server: bool,
handler: Callable[[bytes], bytes],
) -> None:
"""Initialize a stub instance."""
self.stub_handle_data = handler
super().__init__(params, is_server)

async def start_run(self):
"""Call need functions to start server/client."""
if self.is_server:
return await self.transport_listen()
return await self.transport_connect()

def callback_data(self, data: bytes, addr: tuple | None = None) -> int:
"""Handle received data."""
self.stub_handle_data(self, data)
return len(data)

def callback_new_connection(self) -> ModbusProtocol:
"""Call when listener receive new connection request."""
new_stub = TransportStub(self.comm_params, False, self.stub_handle_data)
new_stub.stub_handle_data = self.stub_handle_data
return new_stub


class ClientTester: # pylint: disable=too-few-public-methods
"""Main program."""

def __init__(self, comm: CommType):
"""Initialize runtime tester."""
self.comm = comm

if comm == CommType.TCP:
self.client = modbusClient.AsyncModbusTcpClient(
NULLMODEM_HOST,
port=5004,
)
elif comm == CommType.SERIAL:
self.client = modbusClient.AsyncModbusSerialClient(
f"{NULLMODEM_HOST}:5004",
)
else:
raise RuntimeError("ERROR: CommType not implemented")
server_params = self.client.comm_params.copy()
server_params.source_address = (f"{NULLMODEM_HOST}:5004", 5004)
self.stub = TransportStub(server_params, True, handle_client_data)


async def run(self):
"""Execute test run."""
pymodbus_apply_logging_config()
Log.debug("--> Start testing.")
await self.stub.start_run()
await self.client.connect()
assert self.client.connected
await client_calls(self.client)
Log.debug("--> Closing.")
self.client.close()


class ServerTester: # pylint: disable=too-few-public-methods
"""Main program."""

def __init__(self, comm: CommType):
"""Initialize runtime tester."""
self.comm = comm
self.store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [17] * 100),
co=ModbusSequentialDataBlock(0, [17] * 100),
hr=ModbusSequentialDataBlock(0, [17] * 100),
ir=ModbusSequentialDataBlock(0, [17] * 100),
)
self.context = ModbusServerContext(slaves=self.store, single=True)
self.identity = ModbusDeviceIdentification(
info_name={"VendorName": "VendorName"}
)
if comm == CommType.TCP:
self.server = modbusServer.ModbusTcpServer(
self.context,
framer=Framer.SOCKET,
identity=self.identity,
address=(NULLMODEM_HOST, 5004),
)
elif comm == CommType.SERIAL:
self.server = modbusServer.ModbusSerialServer(
self.context,
framer=Framer.SOCKET,
identity=self.identity,
port=f"{NULLMODEM_HOST}:5004",
)
else:
raise RuntimeError("ERROR: CommType not implemented")
client_params = self.server.comm_params.copy()
client_params.host = client_params.source_address[0]
client_params.port = client_params.source_address[1]
client_params.timeout_connect = 1.0
self.stub = TransportStub(client_params, False, handle_server_data)


async def run(self):
"""Execute test run."""
pymodbus_apply_logging_config()
Log.debug("--> Start testing.")
await self.server.transport_listen()
await self.stub.start_run()
await server_calls(self.stub)
Log.debug("--> Shutting down.")
await self.server.shutdown()


async def main(comm: CommType, use_server: bool):
"""Combine setup and run."""
if use_server:
test = ServerTester(comm)
else:
test = ClientTester(comm)
await test.run()


# -------------- USER CHANGES --------------

async def client_calls(client):
"""Test client API."""
Log.debug("--> Client calls starting.")
_resp = await client.read_holding_registers(address=124, count=4, slave=0)


async def server_calls(transport: ModbusProtocol):
"""Test client API."""
Log.debug("--> Client calls starting.")
_resp = transport.transport_send(b'\x00\x01\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01')
await asyncio.sleep(1)
print("--> JIX done")


def handle_client_data(transport: ModbusProtocol, data: bytes):
"""Respond to request at transport level."""
Log.debug("--> stub called with request {}.", data, ":hex")
response = b'\x01\x03\x08\x00\x05\x00\x05\x00\x00\x00\x00\x0c\xd7'

# Multiple send is allowed, to test fragmentation
# for data in response:
# to_send = data.to_bytes()
# transport.transport_send(to_send)
transport.transport_send(response)


def handle_server_data(_transport: ModbusProtocol, data: bytes):
"""Respond to request at transport level."""
Log.debug("--> stub called with request {}.", data, ":hex")


if __name__ == "__main__":
# True for Server test, False for Client test
# asyncio.run(main(CommType.SERIAL, False), debug=True)
asyncio.run(main(CommType.TCP, True), debug=True)
2 changes: 1 addition & 1 deletion pymodbus/framer/socket_framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def checkFrame(self):
elif len(self._buffer) - self._hsize + 1 >= self._header["len"]:
return True
# we don't have enough of a message yet, wait
Log.debug("Frame check failed, missing part of message!!")
return False

def advanceFrame(self):
Expand Down Expand Up @@ -131,7 +132,6 @@ def frameProcessIncomingPacket(self, single, callback, slave, tid=None, **kwargs
"""
while True:
if not self.checkFrame():
Log.debug("Frame check failed, ignoring!!")
return
if not self._validate_slave_id(slave, single):
header_txt = self._header["uid"]
Expand Down