import asyncio import signal import logging # from pymodbus.server.simulator.http_server import ModbusSimulatorServer from pymodbus.server import ModbusSimulatorServer # import pymodbus.server # print(dir(pymodbus.server)) ## Initialize the logger logging.basicConfig(format='%(asctime)s %(levelname)-10s %(message)s', level = logging.DEBUG) # # Only info messages from pymodbus module # logging.getLogger("pymodbus.logging").setLevel(logging.INFO) # The logger logger = logging.getLogger('MockServer') async def make_mock_server(server:str, device:str, defsfile:str): """Creates mock server""" server = ModbusSimulatorServer(modbus_server=server, modbus_device=device, json_file=defsfile) await server.run_forever() def raise_graceful_exit(*_args): """Enters shutdown mode""" print("receiving shutdown signal now") raise SystemExit async def main(f1, f2): await asyncio.wait([f1, f2]) if __name__ == '__main__' : # TODO: nastartuj 2 try: # signal.signal(signal.SIGINT, raise_graceful_exit) # asyncio.run(make_mock_server('tty_server', 'wrouter', 8899, './wrouter.json')) # #asyncio.run(make_mock_server('tcp_server', 'wrouter', 8899, './wrouter.json')) loop = asyncio.get_event_loop() f1 = loop.create_task(make_mock_server('tty_server', 'wrouter', './registers.json')) f2 = loop.create_task(make_mock_server('tcp_server', 'wrouter', './registers.json')) loop.run_until_complete(main(f1, f2)) loop.close() finally: # asyncio.run(server1.stop()) pass