@@ -151,86 +151,103 @@ def _parse_val(arg_name, val):
151151 return kwargs , execute
152152
153153
154- def cli (client ): # pylint: disable=too-complex
155- """Run client definition."""
156- use_keys = KeyBindings ()
157- history_file = pathlib .Path .home ().joinpath (".pymodhis" )
158-
159- @use_keys .add ("c-space" )
160- def _ (event ):
161- """Initialize autocompletion, or select the next completion."""
162- buff = event .app .current_buffer
163- if buff .complete_state :
164- buff .complete_next ()
165- else :
166- buff .start_completion (select_first = False )
167-
168- @use_keys .add ("enter" , filter = has_selected_completion )
169- def _ (event ):
170- """Make the enter key work as the tab key only when showing the menu."""
171- event .current_buffer .complete_state = None
172- buffer = event .cli .current_buffer
173- buffer .complete_state = None
174-
175- session = PromptSession (
176- lexer = PygmentsLexer (PythonLexer ),
177- completer = CmdCompleter (client ),
178- style = style ,
179- complete_while_typing = True ,
180- bottom_toolbar = bottom_toolbar ,
181- key_bindings = use_keys ,
182- history = FileHistory (history_file ),
183- auto_suggest = AutoSuggestFromHistory (),
184- )
185- click .secho (TITLE , fg = "green" )
186- result = None
187- while True : # pylint: disable=too-many-nested-blocks
188- try :
189-
190- text = session .prompt ("> " , complete_while_typing = True )
191- if text .strip ().lower () == "help" :
192- print_formatted_text (HTML ("<u>Available commands:</u>" ))
193- for cmd , obj in sorted (session .completer .commands .items ()):
194- if cmd != "help" :
195- print_formatted_text (
196- HTML (
197- "<skyblue>{:45s}</skyblue>" # pylint: disable=consider-using-f-string
198- "<seagreen>{:100s}"
199- "</seagreen>" .format (cmd , obj .help_text )
200- )
201- )
202-
203- continue
204- if text .strip ().lower () == "exit" :
205- raise EOFError ()
206- if text .strip ().lower ().startswith ("client." ):
207- text = text .strip ().split ()
208- cmd = text [0 ].split ("." )[1 ]
209- args = text [1 :]
210- kwargs , execute = _process_args (args , string = False )
211- if execute :
212- if text [0 ] in CLIENT_ATTRIBUTES :
213- result = Result (getattr (client , cmd ))
214- else :
215- result = Result (getattr (client , cmd )(** kwargs ))
216- result .print_result ()
217- elif text .strip ().lower ().startswith ("result." ) and result :
218- words = text .lower ().split ()
219- if words [0 ] == "result.raw" :
220- result .raw ()
221- if words [0 ] == "result.decode" :
222- args = words [1 :]
223- kwargs , execute = _process_args (args )
224- if execute :
225- result .decode (** kwargs )
226- except KeyboardInterrupt :
227- continue # Control-C pressed. Try again.
228- except EOFError :
229- break # Control-D pressed.
230- except Exception as exc : # pylint: disable=broad-except
231- click .secho (str (exc ), fg = "red" )
232-
233- click .secho ("GoodBye!" , fg = "blue" )
154+ class CLI : # pylint: disable=too-few-public-methods
155+ """Client definition."""
156+
157+ def __init__ (self , client ):
158+ """Set up client and keybindings."""
159+
160+ use_keys = KeyBindings ()
161+ history_file = pathlib .Path .home ().joinpath (".pymodhis" )
162+ self .client = client
163+
164+ @use_keys .add ("c-space" )
165+ def _ (event ):
166+ """Initialize autocompletion, or select the next completion."""
167+ buff = event .app .current_buffer
168+ if buff .complete_state :
169+ buff .complete_next ()
170+ else :
171+ buff .start_completion (select_first = False )
172+
173+ @use_keys .add ("enter" , filter = has_selected_completion )
174+ def _ (event ):
175+ """Make the enter key work as the tab key only when showing the menu."""
176+ event .current_buffer .complete_state = None
177+ buffer = event .cli .current_buffer
178+ buffer .complete_state = None
179+
180+ self .session = PromptSession (
181+ lexer = PygmentsLexer (PythonLexer ),
182+ completer = CmdCompleter (client ),
183+ style = style ,
184+ complete_while_typing = True ,
185+ bottom_toolbar = bottom_toolbar ,
186+ key_bindings = use_keys ,
187+ history = FileHistory (history_file ),
188+ auto_suggest = AutoSuggestFromHistory (),
189+ )
190+ click .secho (TITLE , fg = "green" )
191+
192+ def _print_command_help (self , commands ):
193+ """Print a list of commands with help text."""
194+ for cmd , obj in sorted (commands .items ()):
195+ if cmd != "help" :
196+ print_formatted_text (
197+ HTML (
198+ "<skyblue>{:45s}</skyblue>" # pylint: disable=consider-using-f-string
199+ "<seagreen>{:100s}"
200+ "</seagreen>" .format (cmd , obj .help_text )
201+ )
202+ )
203+
204+ def _process_client (self , text , client ):
205+ """Process client commands."""
206+ text = text .strip ().split ()
207+ cmd = text [0 ].split ("." )[1 ]
208+ args = text [1 :]
209+ kwargs , execute = _process_args (args , string = False )
210+ if execute :
211+ if text [0 ] in CLIENT_ATTRIBUTES :
212+ result = Result (getattr (client , cmd ))
213+ else :
214+ result = Result (getattr (client , cmd )(** kwargs ))
215+ result .print_result ()
216+
217+ def _process_result (self , text , result ):
218+ """Process result commands."""
219+ words = text .split ()
220+ if words [0 ] == "result.raw" :
221+ result .raw ()
222+ if words [0 ] == "result.decode" :
223+ args = words [1 :]
224+ kwargs , execute = _process_args (args )
225+ if execute :
226+ result .decode (** kwargs )
227+
228+ def run (self ):
229+ """Run the REPL."""
230+ result = None
231+ while True :
232+ try :
233+ text = self .session .prompt ("> " , complete_while_typing = True )
234+ if text .strip ().lower () == "help" :
235+ print_formatted_text (HTML ("<u>Available commands:</u>" ))
236+ self ._print_command_help (self .session .completer .commands )
237+ elif text .strip ().lower () == "exit" :
238+ raise EOFError ()
239+ elif text .strip ().lower ().startswith ("client." ):
240+ self ._process_client (text , self .client )
241+ elif text .strip ().lower ().startswith ("result." ) and result :
242+ self ._process_result (text , result )
243+ except KeyboardInterrupt :
244+ continue # Control-C pressed. Try again.
245+ except EOFError :
246+ break # Control-D pressed.
247+ except Exception as exc : # pylint: disable=broad-except
248+ click .secho (str (exc ), fg = "red" )
249+
250+ click .secho ("GoodBye!" , fg = "blue" )
234251
235252
236253@click .group ("pymodbus-repl" )
@@ -303,7 +320,8 @@ def tcp(ctx, host, port, framer):
303320 if framer == "rtu" :
304321 kwargs ["framer" ] = ModbusRtuFramer
305322 client = ModbusTcpClient (** kwargs )
306- cli (client )
323+ cli = CLI (client )
324+ cli .run ()
307325
308326
309327@main .command ("serial" )
@@ -423,7 +441,8 @@ def serial( # pylint: disable=too-many-arguments
423441 write_timeout = write_timeout ,
424442 ** ctx .obj ,
425443 )
426- cli (client )
444+ cli = CLI (client )
445+ cli .run ()
427446
428447
429448if __name__ == "__main__" :
0 commit comments