diff --git a/OMPython/OMCSession.py b/OMPython/OMCSession.py index 04615b9e..ff1a6cca 100644 --- a/OMPython/OMCSession.py +++ b/OMPython/OMCSession.py @@ -48,7 +48,7 @@ import sys import tempfile import time -from typing import Optional +from typing import Any, Optional import uuid import warnings import zmq @@ -57,12 +57,11 @@ from OMPython.OMTypedParser import parseString as om_parser_typed from OMPython.OMParser import om_parser_basic - # define logger using the current module name as ID logger = logging.getLogger(__name__) -class DummyPopen(): +class DummyPopen: def __init__(self, pid): self.pid = pid self.process = psutil.Process(pid) @@ -84,14 +83,14 @@ class OMCSessionException(Exception): class OMCSessionCmd: - def __init__(self, session: OMCSessionZMQ, readonly: Optional[bool] = False): + def __init__(self, session: OMCSessionZMQ, readonly: bool = False): if not isinstance(session, OMCSessionZMQ): raise OMCSessionException("Invalid session definition!") self._session = session self._readonly = readonly - self._omc_cache = {} + self._omc_cache: dict[tuple[str, bool], Any] = {} - def _ask(self, question: str, opt: Optional[list[str]] = None, parsed: Optional[bool] = True): + def _ask(self, question: str, opt: Optional[list[str]] = None, parsed: bool = True): if opt is None: expression = question @@ -107,8 +106,6 @@ def _ask(self, question: str, opt: Optional[list[str]] = None, parsed: Optional[ if p in self._omc_cache: return self._omc_cache[p] - logger.debug('OMC ask: %s (parsed=%s)', expression, parsed) - try: res = self._session.sendExpression(expression, parsed=parsed) except OMCSessionException as ex: @@ -273,26 +270,29 @@ def getClassNames(self, className=None, recursive=False, qualified=False, sort=F class OMCSessionZMQ: - def __init__(self, timeout=10.00, - docker=None, dockerContainer=None, dockerExtraArgs=None, dockerOpenModelicaPath="omc", - dockerNetwork=None, port=None, omhome: str = None): + def __init__(self, + timeout: float = 10.00, + docker: Optional[str] = None, + dockerContainer: Optional[int] = None, + dockerExtraArgs: Optional[list] = None, + dockerOpenModelicaPath: str = "omc", + dockerNetwork: Optional[str] = None, + port: Optional[int] = None, + omhome: Optional[str] = None): if dockerExtraArgs is None: dockerExtraArgs = [] - self.omhome = self._get_omhome(omhome=omhome) + self._omhome = self._get_omhome(omhome=omhome) self._omc_process = None self._omc_command = None - self._omc = None - self._dockerCid = None + self._omc: Optional[Any] = None + self._dockerCid: Optional[int] = None self._serverIPAddress = "127.0.0.1" self._interactivePort = None - # FIXME: this code is not well written... need to be refactored self._temp_dir = pathlib.Path(tempfile.gettempdir()) # generate a random string for this session self._random_string = uuid.uuid4().hex - # omc log file - self._omc_log_file = None try: self._currentUser = getpass.getuser() if not self._currentUser: @@ -301,30 +301,28 @@ def __init__(self, timeout=10.00, # We are running as a uid not existing in the password database... Pretend we are nobody self._currentUser = "nobody" - # Locating and using the IOR - if sys.platform != 'win32' or docker or dockerContainer: - self._port_file = "openmodelica." + self._currentUser + ".port." + self._random_string - else: - self._port_file = "openmodelica.port." + self._random_string self._docker = docker self._dockerContainer = dockerContainer self._dockerExtraArgs = dockerExtraArgs self._dockerOpenModelicaPath = dockerOpenModelicaPath self._dockerNetwork = dockerNetwork - self._create_omc_log_file("port") + self._omc_log_file = self._create_omc_log_file("port") self._timeout = timeout - self._port_file = ((pathlib.Path("/tmp") if docker else self._temp_dir) / self._port_file).as_posix() + # Locating and using the IOR + if sys.platform != 'win32' or docker or dockerContainer: + port_file = "openmodelica." + self._currentUser + ".port." + self._random_string + else: + port_file = "openmodelica.port." + self._random_string + self._port_file = ((pathlib.Path("/tmp") if docker else self._temp_dir) / port_file).as_posix() self._interactivePort = port # set omc executable path and args - self._set_omc_command([ - "--interactive=zmq", - "--locale=C", - f"-z={self._random_string}" - ]) + self._omc_command = self._set_omc_command(omc_path_and_args_list=["--interactive=zmq", + "--locale=C", + f"-z={self._random_string}"]) # start up omc executable, which is waiting for the ZMQ connection - self._start_omc_process(timeout) + self._omc_process = self._start_omc_process(timeout) # connect to the running omc instance using ZMQ - self._connect_to_omc(timeout) + self._omc_port = self._connect_to_omc(timeout) self._re_log_entries = None self._re_log_raw = None @@ -344,27 +342,29 @@ def __del__(self): self._omc_process.kill() self._omc_process.wait() - def _create_omc_log_file(self, suffix): + def _create_omc_log_file(self, suffix): # output? if sys.platform == 'win32': log_filename = f"openmodelica.{suffix}.{self._random_string}.log" else: log_filename = f"openmodelica.{self._currentUser}.{suffix}.{self._random_string}.log" # this file must be closed in the destructor - self._omc_log_file = open(self._temp_dir / log_filename, "w+") + omc_log_file = open(self._temp_dir / log_filename, "w+") + + return omc_log_file - def _start_omc_process(self, timeout): + def _start_omc_process(self, timeout): # output? if sys.platform == 'win32': - omhome_bin = (self.omhome / "bin").as_posix() + omhome_bin = (self._omhome / "bin").as_posix() my_env = os.environ.copy() my_env["PATH"] = omhome_bin + os.pathsep + my_env["PATH"] - self._omc_process = subprocess.Popen(self._omc_command, stdout=self._omc_log_file, - stderr=self._omc_log_file, env=my_env) + omc_process = subprocess.Popen(self._omc_command, stdout=self._omc_log_file, + stderr=self._omc_log_file, env=my_env) else: # set the user environment variable so omc running from wsgi has the same user as OMPython my_env = os.environ.copy() my_env["USER"] = self._currentUser - self._omc_process = subprocess.Popen(self._omc_command, stdout=self._omc_log_file, - stderr=self._omc_log_file, env=my_env) + omc_process = subprocess.Popen(self._omc_command, stdout=self._omc_log_file, + stderr=self._omc_log_file, env=my_env) if self._docker: for i in range(0, 40): try: @@ -387,29 +387,30 @@ def _start_omc_process(self, timeout): dockerTop = None if self._docker or self._dockerContainer: if self._dockerNetwork == "separate": - self._serverIPAddress = json.loads(subprocess.check_output(["docker", "inspect", self._dockerCid]).decode().strip())[0]["NetworkSettings"]["IPAddress"] + output = subprocess.check_output(["docker", "inspect", self._dockerCid]).decode().strip() + self._serverIPAddress = json.loads(output)[0]["NetworkSettings"]["IPAddress"] for i in range(0, 40): if sys.platform == 'win32': break dockerTop = subprocess.check_output(["docker", "top", self._dockerCid]).decode().strip() - self._omc_process = None + omc_process = None for line in dockerTop.split("\n"): columns = line.split() if self._random_string in line: try: - self._omc_process = DummyPopen(int(columns[1])) + omc_process = DummyPopen(int(columns[1])) except psutil.NoSuchProcess: raise OMCSessionException( f"Could not find PID {dockerTop} - is this a docker instance spawned " f"without --pid=host?\nLog-file says:\n{open(self._omc_log_file.name).read()}") break - if self._omc_process is not None: + if omc_process is not None: break time.sleep(timeout / 40.0) - if self._omc_process is None: + if omc_process is None: raise OMCSessionException("Docker top did not contain omc process %s:\n%s\nLog-file says:\n%s" % (self._random_string, dockerTop, open(self._omc_log_file.name).read())) - return self._omc_process + return omc_process def _getuid(self): """ @@ -419,7 +420,7 @@ def _getuid(self): """ return 1000 if sys.platform == 'win32' else os.getuid() - def _set_omc_command(self, omc_path_and_args_list): + def _set_omc_command(self, omc_path_and_args_list) -> list: """Define the command that will be called by the subprocess module. On Windows, use the list input style of the subprocess module to @@ -446,20 +447,31 @@ def _set_omc_command(self, omc_path_and_args_list): else: raise OMCSessionException('dockerNetwork was set to %s, but only \"host\" or \"separate\" is allowed') self._dockerCidFile = self._omc_log_file.name + ".docker.cid" - omcCommand = ["docker", "run", "--cidfile", self._dockerCidFile, "--rm", "--env", "USER=%s" % self._currentUser, "--user", str(self._getuid())] + self._dockerExtraArgs + dockerNetworkStr + [self._docker, self._dockerOpenModelicaPath] + omcCommand = (["docker", "run", + "--cidfile", self._dockerCidFile, + "--rm", + "--env", "USER=%s" % self._currentUser, + "--user", str(self._getuid())] + + self._dockerExtraArgs + + dockerNetworkStr + + [self._docker, self._dockerOpenModelicaPath]) elif self._dockerContainer: - omcCommand = ["docker", "exec", "--env", "USER=%s" % self._currentUser, "--user", str(self._getuid())] + self._dockerExtraArgs + [self._dockerContainer, self._dockerOpenModelicaPath] + omcCommand = (["docker", "exec", + "--env", "USER=%s" % self._currentUser, + "--user", str(self._getuid())] + + self._dockerExtraArgs + + [self._dockerContainer, self._dockerOpenModelicaPath]) self._dockerCid = self._dockerContainer else: omcCommand = [str(self._get_omc_path())] if self._interactivePort: extraFlags = extraFlags + ["--interactivePort=%d" % int(self._interactivePort)] - self._omc_command = omcCommand + omc_path_and_args_list + extraFlags + omc_command = omcCommand + omc_path_and_args_list + extraFlags - return self._omc_command + return omc_command - def _get_omhome(self, omhome: str = None): + def _get_omhome(self, omhome: Optional[str] = None): # use the provided path if omhome is not None: return pathlib.Path(omhome) @@ -477,18 +489,20 @@ def _get_omhome(self, omhome: str = None): raise OMCSessionException("Cannot find OpenModelica executable, please install from openmodelica.org") def _get_omc_path(self) -> pathlib.Path: - return self.omhome / "bin" / "omc" + return self._omhome / "bin" / "omc" - def _connect_to_omc(self, timeout): - self._omc_zeromq_uri = "file:///" + self._port_file + def _connect_to_omc(self, timeout) -> str: + omc_zeromq_uri = "file:///" + self._port_file # See if the omc server is running attempts = 0 - self._port = None + port = None while True: if self._dockerCid: try: - self._port = subprocess.check_output(["docker", "exec", self._dockerCid, "cat", self._port_file], - stderr=subprocess.DEVNULL).decode().strip() + port = subprocess.check_output(args=["docker", + "exec", str(self._dockerCid), + "cat", str(self._port_file)], + stderr=subprocess.DEVNULL).decode().strip() break except subprocess.CalledProcessError: pass @@ -496,7 +510,7 @@ def _connect_to_omc(self, timeout): if os.path.isfile(self._port_file): # Read the port file with open(self._port_file, 'r') as f_p: - self._port = f_p.readline() + port = f_p.readline() os.remove(self._port_file) break @@ -506,18 +520,21 @@ def _connect_to_omc(self, timeout): self._omc_log_file.close() logger.error("OMC Server did not start. Please start it! Log-file says:\n%s" % open(name).read()) raise OMCSessionException(f"OMC Server did not start (timeout={timeout}). " - "Could not open file {self._port_file}") + f"Could not open file {self._port_file}") time.sleep(timeout / 80.0) - self._port = self._port.replace("0.0.0.0", self._serverIPAddress) - logger.info(f"OMC Server is up and running at {self._omc_zeromq_uri} pid={self._omc_process.pid} cid={self._dockerCid}") + port = port.replace("0.0.0.0", self._serverIPAddress) + logger.info(f"OMC Server is up and running at {omc_zeromq_uri} " + f"pid={self._omc_process.pid if self._omc_process else '?'} cid={self._dockerCid}") # Create the ZeroMQ socket and connect to OMC server context = zmq.Context.instance() self._omc = context.socket(zmq.REQ) self._omc.setsockopt(zmq.LINGER, 0) # Dismisses pending messages if closed self._omc.setsockopt(zmq.IMMEDIATE, True) # Queue messages only to completed connections - self._omc.connect(self._port) + self._omc.connect(port) + + return port def execute(self, command): warnings.warn("This function is depreciated and will be removed in future versions; " @@ -533,6 +550,8 @@ def sendExpression(self, command, parsed=True): if self._omc is None: raise OMCSessionException("No OMC running. Create a new instance of OMCSessionZMQ!") + logger.debug("sendExpression(%r, parsed=%r)", command, parsed) + attempts = 0 while True: try: