diff --git a/OMPython/ModelicaSystem.py b/OMPython/ModelicaSystem.py index ae480dde..09052ec3 100644 --- a/OMPython/ModelicaSystem.py +++ b/OMPython/ModelicaSystem.py @@ -33,23 +33,21 @@ """ import ast -import csv from dataclasses import dataclass +import itertools import logging import numbers import numpy as np import os -import pathlib -import platform -import re -import subprocess -import tempfile +import queue import textwrap -from typing import Optional, Any +import threading +from typing import Any, Optional import warnings import xml.etree.ElementTree as ET -from OMPython.OMCSession import OMCSessionException, OMCSessionZMQ, OMCProcessLocal +from OMPython.OMCSession import (OMCSessionException, OMCSessionRunData, OMCSessionZMQ, + OMCProcess, OMCPath) # define logger using the current module name as ID logger = logging.getLogger(__name__) @@ -115,14 +113,21 @@ def __getitem__(self, index: int): class ModelicaSystemCmd: """A compiled model executable.""" - def __init__(self, runpath: pathlib.Path, modelname: str, timeout: Optional[float] = None) -> None: - self._runpath = pathlib.Path(runpath).resolve().absolute() + def __init__( + self, + session: OMCSessionZMQ, + runpath: OMCPath, + modelname: str, + timeout: Optional[float] = None, + ) -> None: + self._session = session + self._runpath = runpath self._model_name = modelname self._timeout = timeout self._args: dict[str, str | None] = {} self._arg_override: dict[str, str] = {} - def arg_set(self, key: str, val: Optional[str | dict] = None) -> None: + def arg_set(self, key: str, val: Optional[str | dict[str, Any]] = None) -> None: """ Set one argument for the executable model. @@ -165,7 +170,7 @@ def arg_get(self, key: str) -> Optional[str | dict]: return None - def args_set(self, args: dict[str, Optional[str | dict[str, str]]]) -> None: + def args_set(self, args: dict[str, Optional[str | dict[str, Any]]]) -> None: """ Define arguments for the model executable. @@ -176,27 +181,12 @@ def args_set(self, args: dict[str, Optional[str | dict[str, str]]]) -> None: for arg in args: self.arg_set(key=arg, val=args[arg]) - def get_exe(self) -> pathlib.Path: - """Get the path to the compiled model executable.""" - if platform.system() == "Windows": - path_exe = self._runpath / f"{self._model_name}.exe" - else: - path_exe = self._runpath / self._model_name - - if not path_exe.exists(): - raise ModelicaSystemError(f"Application file path not found: {path_exe}") - - return path_exe - - def get_cmd(self) -> list: - """Get a list with the path to the executable and all command line args. - - This can later be used as an argument for subprocess.run(). + def get_cmd_args(self) -> list[str]: + """ + Get a list with the command arguments for the model executable. """ - path_exe = self.get_exe() - - cmdl = [path_exe.as_posix()] + cmdl = [] for key in self._args: if self._args[key] is None: cmdl.append(f"-{key}") @@ -205,54 +195,26 @@ def get_cmd(self) -> list: return cmdl - def run(self) -> int: - """Run the requested simulation. - - Returns - ------- - Subprocess return code (0 on success). + def definition(self) -> OMCSessionRunData: """ + Define all needed data to run the model executable. The data is stored in an OMCSessionRunData object. + """ + # ensure that a result filename is provided + result_file = self.arg_get('r') + if not isinstance(result_file, str): + result_file = (self._runpath / f"{self._model_name}.mat").as_posix() + + omc_run_data = OMCSessionRunData( + cmd_path=self._runpath.as_posix(), + cmd_model_name=self._model_name, + cmd_args=self.get_cmd_args(), + cmd_result_path=result_file, + cmd_timeout=self._timeout, + ) - cmdl: list = self.get_cmd() - - logger.debug("Run OM command %s in %s", repr(cmdl), self._runpath.as_posix()) - - if platform.system() == "Windows": - path_dll = "" - - # set the process environment from the generated .bat file in windows which should have all the dependencies - path_bat = self._runpath / f"{self._model_name}.bat" - if not path_bat.exists(): - raise ModelicaSystemError("Batch file (*.bat) does not exist " + str(path_bat)) - - with open(file=path_bat, mode='r', encoding='utf-8') as fh: - for line in fh: - match = re.match(r"^SET PATH=([^%]*)", line, re.IGNORECASE) - if match: - path_dll = match.group(1).strip(';') # Remove any trailing semicolons - my_env = os.environ.copy() - my_env["PATH"] = path_dll + os.pathsep + my_env["PATH"] - else: - # TODO: how to handle path to resources of external libraries for any system not Windows? - my_env = None - - try: - cmdres = subprocess.run(cmdl, capture_output=True, text=True, env=my_env, cwd=self._runpath, - timeout=self._timeout, check=True) - stdout = cmdres.stdout.strip() - stderr = cmdres.stderr.strip() - returncode = cmdres.returncode - - logger.debug("OM output for command %s:\n%s", repr(cmdl), stdout) - - if stderr: - raise ModelicaSystemError(f"Error running command {repr(cmdl)}: {stderr}") - except subprocess.TimeoutExpired as ex: - raise ModelicaSystemError(f"Timeout running command {repr(cmdl)}") from ex - except subprocess.CalledProcessError as ex: - raise ModelicaSystemError(f"Error running command {repr(cmdl)}") from ex + omc_run_data_updated = self._session.omc_run_data_update(omc_run_data=omc_run_data) - return returncode + return omc_run_data_updated @staticmethod def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, str]]]: @@ -296,14 +258,14 @@ def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, str]]]: class ModelicaSystem: def __init__( self, - fileName: Optional[str | os.PathLike | pathlib.Path] = None, + fileName: Optional[str | os.PathLike] = None, modelName: Optional[str] = None, lmodel: Optional[list[str | tuple[str, str]]] = None, commandLineOptions: Optional[str] = None, variableFilter: Optional[str] = None, - customBuildDirectory: Optional[str | os.PathLike | pathlib.Path] = None, + customBuildDirectory: Optional[str | os.PathLike] = None, omhome: Optional[str] = None, - omc_process: Optional[OMCProcessLocal] = None, + omc_process: Optional[OMCProcess] = None, build: bool = True, ) -> None: """Initialize, load and build a model. @@ -368,11 +330,9 @@ def __init__( self._linearized_states: list[str] = [] # linearization states list if omc_process is not None: - if not isinstance(omc_process, OMCProcessLocal): - raise ModelicaSystemError("Invalid (local) omc process definition provided!") - self._getconn = OMCSessionZMQ(omc_process=omc_process) + self._session = OMCSessionZMQ(omc_process=omc_process) else: - self._getconn = OMCSessionZMQ(omhome=omhome) + self._session = OMCSessionZMQ(omhome=omhome) # set commandLineOptions if provided by users self.setCommandLineOptions(commandLineOptions=commandLineOptions) @@ -383,12 +343,15 @@ def __init__( if not isinstance(lmodel, list): raise ModelicaSystemError(f"Invalid input type for lmodel: {type(lmodel)} - list expected!") - self._xml_file = None self._lmodel = lmodel # may be needed if model is derived from other model self._model_name = modelName # Model class name - self._file_name = pathlib.Path(fileName).resolve() if fileName is not None else None # Model file/package name + if fileName is not None: + file_name = self._session.omcpath(fileName).resolve() + else: + file_name = None + self._file_name: Optional[OMCPath] = file_name # Model file/package name self._simulated = False # True if the model has already been simulated - self._result_file: Optional[pathlib.Path] = None # for storing result file + self._result_file: Optional[OMCPath] = None # for storing result file self._variable_filter = variableFilter if self._file_name is not None and not self._file_name.is_file(): # if file does not exist @@ -400,7 +363,7 @@ def __init__( self.setCommandLineOptions("--linearizationDumpLanguage=python") self.setCommandLineOptions("--generateSymbolicLinearization") - self._tempdir = self.setTempDirectory(customBuildDirectory) + self._work_dir: OMCPath = self.setWorkDirectory(customBuildDirectory) if self._file_name is not None: self._loadLibrary(lmodel=self._lmodel) @@ -413,6 +376,9 @@ def __init__( if build: self.buildModel(variableFilter) + def get_session(self) -> OMCSessionZMQ: + return self._session + def setCommandLineOptions(self, commandLineOptions: Optional[str] = None): # set commandLineOptions if provided by users if commandLineOptions is None: @@ -420,7 +386,7 @@ def setCommandLineOptions(self, commandLineOptions: Optional[str] = None): exp = f'setCommandLineOptions("{commandLineOptions}")' self.sendExpression(exp) - def _loadFile(self, fileName: pathlib.Path): + def _loadFile(self, fileName: OMCPath): # load file self.sendExpression(f'loadFile("{fileName.as_posix()}")') @@ -448,25 +414,34 @@ def _loadLibrary(self, lmodel: list): '1)["Modelica"]\n' '2)[("Modelica","3.2.3"), "PowerSystems"]\n') - def setTempDirectory(self, customBuildDirectory: Optional[str | os.PathLike | pathlib.Path] = None) -> pathlib.Path: - # create a unique temp directory for each session and build the model in that directory + def setWorkDirectory(self, customBuildDirectory: Optional[str | os.PathLike] = None) -> OMCPath: + """ + Define the work directory for the ModelicaSystem / OpenModelica session. The model is build within this + directory. If no directory is defined a unique temporary directory is created. + """ if customBuildDirectory is not None: - if not os.path.exists(customBuildDirectory): - raise IOError(f"{customBuildDirectory} does not exist") - tempdir = pathlib.Path(customBuildDirectory).absolute() + workdir = self._session.omcpath(customBuildDirectory).absolute() + if not workdir.is_dir(): + raise IOError(f"Provided work directory does not exists: {customBuildDirectory}!") else: - tempdir = pathlib.Path(tempfile.mkdtemp()).absolute() - if not tempdir.is_dir(): - raise IOError(f"{tempdir} could not be created") + workdir = self._session.omcpath_tempdir().absolute() + if not workdir.is_dir(): + raise IOError(f"{workdir} could not be created") - logger.info("Define tempdir as %s", tempdir) - exp = f'cd("{tempdir.as_posix()}")' + logger.info("Define work dir as %s", workdir) + exp = f'cd("{workdir.as_posix()}")' self.sendExpression(exp) - return tempdir + # set the class variable _tempdir ... + self._work_dir = workdir + # ... and also return the defined path + return workdir - def getWorkDirectory(self) -> pathlib.Path: - return self._tempdir + def getWorkDirectory(self) -> OMCPath: + """ + Return the defined working directory for this ModelicaSystem / OpenModelica session. + """ + return self._work_dir def buildModel(self, variableFilter: Optional[str] = None): if variableFilter is not None: @@ -480,12 +455,26 @@ def buildModel(self, variableFilter: Optional[str] = None): buildModelResult = self._requestApi("buildModel", self._model_name, properties=varFilter) logger.debug("OM model build result: %s", buildModelResult) - self._xml_file = pathlib.Path(buildModelResult[0]).parent / buildModelResult[1] - self._xmlparse() + # check if the executable exists ... + om_cmd = ModelicaSystemCmd( + session=self._session, + runpath=self.getWorkDirectory(), + modelname=self._model_name, + timeout=5.0, + ) + # ... by running it - output help for command help + om_cmd.arg_set(key="help", val="help") + cmd_definition = om_cmd.definition() + returncode = self._session.run_model_executable(cmd_run_data=cmd_definition) + if returncode != 0: + raise ModelicaSystemError("Model executable not working!") + + xml_file = self._session.omcpath(buildModelResult[0]).parent / buildModelResult[1] + self._xmlparse(xml_file=xml_file) def sendExpression(self, expr: str, parsed: bool = True): try: - retval = self._getconn.sendExpression(expr, parsed) + retval = self._session.sendExpression(expr, parsed) except OMCSessionException as ex: raise ModelicaSystemError(f"Error executing {repr(expr)}") from ex @@ -507,23 +496,34 @@ def _requestApi(self, apiName, entity=None, properties=None): # 2 return self.sendExpression(exp) - def _xmlparse(self): - if not self._xml_file.is_file(): - raise ModelicaSystemError(f"XML file not generated: {self._xml_file}") + def _xmlparse(self, xml_file: OMCPath): + if not xml_file.is_file(): + raise ModelicaSystemError(f"XML file not generated: {xml_file}") - tree = ET.parse(self._xml_file) + xml_content = xml_file.read_text() + tree = ET.ElementTree(ET.fromstring(xml_content)) rootCQ = tree.getroot() for attr in rootCQ.iter('DefaultExperiment'): for key in ("startTime", "stopTime", "stepSize", "tolerance", "solver", "outputFormat"): - self._simulate_options[key] = attr.get(key) + self._simulate_options[key] = str(attr.get(key)) for sv in rootCQ.iter('ScalarVariable'): - scalar = {} - for key in ("name", "description", "variability", "causality", "alias"): - scalar[key] = sv.get(key) - scalar["changeable"] = sv.get('isValueChangeable') - scalar["aliasvariable"] = sv.get('aliasVariable') + translations = { + "alias": "alias", + "aliasvariable": "aliasVariable", + "causality": "causality", + "changeable": "isValueChangeable", + "description": "description", + "name": "name", + "variability": "variability", + } + + scalar: dict[str, Any] = {} + for key_dst, key_src in translations.items(): + val = sv.get(key_src) + scalar[key_dst] = None if val is None else str(val) + ch = list(sv) for att in ch: scalar["start"] = att.get('start') @@ -531,6 +531,7 @@ def _xmlparse(self): scalar["max"] = att.get('max') scalar["unit"] = att.get('unit') + # save parameters in the corresponding class variables if scalar["variability"] == "parameter": if scalar["name"] in self._override_variables: self._params[scalar["name"]] = self._override_variables[scalar["name"]] @@ -915,7 +916,7 @@ def getOptimizationOptions(self, names: Optional[str | list[str]] = None) -> dic def simulate_cmd( self, - result_file: pathlib.Path, + result_file: OMCPath, simflags: Optional[str] = None, simargs: Optional[dict[str, Optional[str | dict[str, str]]]] = None, timeout: Optional[float] = None, @@ -942,7 +943,12 @@ def simulate_cmd( An instance if ModelicaSystemCmd to run the requested simulation. """ - om_cmd = ModelicaSystemCmd(runpath=self._tempdir, modelname=self._model_name, timeout=timeout) + om_cmd = ModelicaSystemCmd( + session=self._session, + runpath=self.getWorkDirectory(), + modelname=self._model_name, + timeout=timeout, + ) # always define the result file to use om_cmd.arg_set(key="r", val=result_file.as_posix()) @@ -954,16 +960,17 @@ def simulate_cmd( if simargs: om_cmd.args_set(args=simargs) - overrideFile = self._tempdir / f"{self._model_name}_override.txt" if self._override_variables or self._simulate_options_override: - tmpdict = self._override_variables.copy() - tmpdict.update(self._simulate_options_override) - # write to override file - with open(file=overrideFile, mode="w", encoding="utf-8") as fh: - for key, value in tmpdict.items(): - fh.write(f"{key}={value}\n") + override_file = result_file.parent / f"{result_file.stem}_override.txt" + + override_content = ( + "\n".join([f"{key}={value}" for key, value in self._override_variables.items()]) + + "\n".join([f"{key}={value}" for key, value in self._simulate_options_override.items()]) + + "\n" + ) - om_cmd.arg_set(key="overrideFile", val=overrideFile.as_posix()) + override_file.write_text(override_content) + om_cmd.arg_set(key="overrideFile", val=override_file.as_posix()) if self._inputs: # if model has input quantities for key in self._inputs: @@ -1013,11 +1020,16 @@ def simulate( if resultfile is None: # default result file generated by OM - self._result_file = self._tempdir / f"{self._model_name}_res.mat" - elif os.path.exists(resultfile): - self._result_file = pathlib.Path(resultfile) + self._result_file = self.getWorkDirectory() / f"{self._model_name}_res.mat" + elif isinstance(resultfile, OMCPath): + self._result_file = resultfile else: - self._result_file = self._tempdir / resultfile + self._result_file = self._session.omcpath(resultfile) + if not self._result_file.is_absolute(): + self._result_file = self.getWorkDirectory() / resultfile + + if not isinstance(self._result_file, OMCPath): + raise ModelicaSystemError(f"Invalid result file path: {self._result_file} - must be an OMCPath object!") om_cmd = self.simulate_cmd( result_file=self._result_file, @@ -1030,13 +1042,14 @@ def simulate( if self._result_file.is_file(): self._result_file.unlink() # ... run simulation ... - returncode = om_cmd.run() + cmd_definition = om_cmd.definition() + returncode = self._session.run_model_executable(cmd_run_data=cmd_definition) # and check returncode *AND* resultfile if returncode != 0 and self._result_file.is_file(): # check for an empty (=> 0B) result file which indicates a crash of the model executable # see: https://github.com/OpenModelica/OMPython/issues/261 # https://github.com/OpenModelica/OpenModelica/issues/13829 - if self._result_file.stat().st_size == 0: + if self._result_file.size() == 0: self._result_file.unlink() raise ModelicaSystemError("Empty result file - this indicates a crash of the model executable!") @@ -1044,7 +1057,11 @@ def simulate( self._simulated = True - def getSolutions(self, varList: Optional[str | list[str]] = None, resultfile: Optional[str] = None) -> tuple[str] | np.ndarray: + def getSolutions( + self, + varList: Optional[str | list[str]] = None, + resultfile: Optional[str | os.PathLike] = None, + ) -> tuple[str] | np.ndarray: """Extract simulation results from a result data file. Args: @@ -1081,7 +1098,7 @@ def getSolutions(self, varList: Optional[str | list[str]] = None, resultfile: Op raise ModelicaSystemError("No result file found. Run simulate() first.") result_file = self._result_file else: - result_file = pathlib.Path(resultfile) + result_file = self._session.omcpath(resultfile) # check for result file exits if not result_file.is_file(): @@ -1373,7 +1390,7 @@ def setInputs( return True - def _createCSVData(self, csvfile: Optional[pathlib.Path] = None) -> pathlib.Path: + def _createCSVData(self, csvfile: Optional[OMCPath] = None) -> OMCPath: """ Create a csv file with inputs for the simulation/optimization of the model. If csvfile is provided as argument, this file is used; else a generic file name is created. @@ -1419,11 +1436,12 @@ def _createCSVData(self, csvfile: Optional[pathlib.Path] = None) -> pathlib.Path csv_rows.append(row) if csvfile is None: - csvfile = self._tempdir / f'{self._model_name}.csv' + csvfile = self.getWorkDirectory() / f'{self._model_name}.csv' - with open(file=csvfile, mode="w", encoding="utf-8", newline="") as fh: - writer = csv.writer(fh) - writer.writerows(csv_rows) + # basic definition of a CSV file using csv_rows as input + csv_content = "\n".join([",".join(map(str, row)) for row in csv_rows]) + "\n" + + csvfile.write_text(csv_content) return csvfile @@ -1534,24 +1552,29 @@ def linearize(self, lintime: Optional[float] = None, simflags: Optional[str] = N * `result = linearize(); A = result[0]` mostly just for backwards compatibility, because linearize() used to return `[A, B, C, D]`. """ - - if self._xml_file is None: + if len(self._quantities) == 0: + # if self._quantities has no content, the xml file was not parsed; see self._xmlparse() raise ModelicaSystemError( "Linearization cannot be performed as the model is not build, " "use ModelicaSystem() to build the model first" ) - om_cmd = ModelicaSystemCmd(runpath=self._tempdir, modelname=self._model_name, timeout=timeout) - - overrideLinearFile = self._tempdir / f'{self._model_name}_override_linear.txt' + om_cmd = ModelicaSystemCmd( + session=self._session, + runpath=self.getWorkDirectory(), + modelname=self._model_name, + timeout=timeout, + ) - with open(file=overrideLinearFile, mode="w", encoding="utf-8") as fh: - for key, value in self._override_variables.items(): - fh.write(f"{key}={value}\n") - for key, value in self._linearization_options.items(): - fh.write(f"{key}={value}\n") + override_content = ( + "\n".join([f"{key}={value}" for key, value in self._override_variables.items()]) + + "\n".join([f"{key}={value}" for key, value in self._linearization_options.items()]) + + "\n" + ) + override_file = self.getWorkDirectory() / f'{self._model_name}_override_linear.txt' + override_file.write_text(override_content) - om_cmd.arg_set(key="overrideFile", val=overrideLinearFile.as_posix()) + om_cmd.arg_set(key="overrideFile", val=override_file.as_posix()) if self._inputs: for key in self._inputs: @@ -1573,19 +1596,18 @@ def linearize(self, lintime: Optional[float] = None, simflags: Optional[str] = N om_cmd.args_set(args=simargs) # the file create by the model executable which contains the matrix and linear inputs, outputs and states - linear_file = self._tempdir / "linearized_model.py" - + linear_file = self.getWorkDirectory() / "linearized_model.py" linear_file.unlink(missing_ok=True) - returncode = om_cmd.run() + cmd_definition = om_cmd.definition() + returncode = self._session.run_model_executable(cmd_run_data=cmd_definition) if returncode != 0: raise ModelicaSystemError(f"Linearize failed with return code: {returncode}") + if not linear_file.is_file(): + raise ModelicaSystemError(f"Linearization failed: {linear_file} not found!") self._simulated = True - if not linear_file.exists(): - raise ModelicaSystemError(f"Linearization failed: {linear_file} not found!") - # extract data from the python file with the linearized model using the ast module - this allows to get the # needed information without executing the created code linear_data = {} @@ -1637,3 +1659,394 @@ def getLinearOutputs(self) -> list[str]: def getLinearStates(self) -> list[str]: """Get names of state variables of the linearized model.""" return self._linearized_states + + +class ModelicaSystemDoE: + """ + Class to run DoEs based on a (Open)Modelica model using ModelicaSystem + + Example + ------- + ``` + import OMPython + import pathlib + + + def run_doe(): + mypath = pathlib.Path('.') + + model = mypath / "M.mo" + model.write_text( + " model M\n" + " parameter Integer p=1;\n" + " parameter Integer q=1;\n" + " parameter Real a = -1;\n" + " parameter Real b = -1;\n" + " Real x[p];\n" + " Real y[q];\n" + " equation\n" + " der(x) = a * fill(1.0, p);\n" + " der(y) = b * fill(1.0, q);\n" + " end M;\n" + ) + + param = { + # structural + 'p': [1, 2], + 'q': [3, 4], + # simple + 'a': [5, 6], + 'b': [7, 8], + } + + resdir = mypath / 'DoE' + resdir.mkdir(exist_ok=True) + + doe_mod = OMPython.ModelicaSystemDoE( + fileName=model.as_posix(), + modelName="M", + parameters=param, + resultpath=resdir, + simargs={"override": {'stopTime': 1.0}}, + ) + doe_mod.prepare() + doe_def = doe_mod.get_doe_definition() + doe_mod.simulate() + doe_sol = doe_mod.get_doe_solutions() + + # ... work with doe_def and doe_sol ... + + + if __name__ == "__main__": + run_doe() + ``` + + """ + + DICT_RESULT_FILENAME: str = 'result filename' + DICT_RESULT_AVAILABLE: str = 'result available' + + def __init__( + self, + # data to be used for ModelicaSystem + fileName: Optional[str | os.PathLike] = None, + modelName: Optional[str] = None, + lmodel: Optional[list[str | tuple[str, str]]] = None, + commandLineOptions: Optional[str] = None, + variableFilter: Optional[str] = None, + customBuildDirectory: Optional[str | os.PathLike] = None, + omhome: Optional[str] = None, + omc_process: Optional[OMCProcess] = None, + # simulation specific input + # TODO: add more settings (simulation options, input options, ...) + simargs: Optional[dict[str, Optional[str | dict[str, Any]]]] = None, + timeout: Optional[int] = None, + # DoE specific inputs + resultpath: Optional[str | os.PathLike] = None, + parameters: Optional[dict[str, list[str] | list[int] | list[float]]] = None, + ) -> None: + """ + Initialisation of ModelicaSystemDoE. The parameters are based on: ModelicaSystem.__init__() and + ModelicaSystem.simulate(). Additionally, the path to store the result files is needed (= resultpath) as well as + a list of parameters to vary for the Doe (= parameters). All possible combinations are considered. + """ + + self._mod = ModelicaSystem( + fileName=fileName, + modelName=modelName, + lmodel=lmodel, + commandLineOptions=commandLineOptions, + variableFilter=variableFilter, + customBuildDirectory=customBuildDirectory, + omhome=omhome, + omc_process=omc_process, + ) + + self._model_name = modelName + + self._simargs = simargs + self._timeout = timeout + + if resultpath is None: + self._resultpath = self._mod.get_session().omcpath_tempdir() + else: + self._resultpath = self._mod.get_session().omcpath(resultpath) + if not self._resultpath.is_dir(): + raise ModelicaSystemError("Argument resultpath must be set to a valid path within the environment used " + f"for the OpenModelica session: {resultpath}!") + + if isinstance(parameters, dict): + self._parameters = parameters + else: + self._parameters = {} + + self._doe_def: Optional[dict[str, dict[str, Any]]] = None + self._doe_cmd: Optional[dict[str, OMCSessionRunData]] = None + + def prepare(self) -> int: + """ + Prepare the DoE by evaluating the parameters. Each structural parameter requires a new instance of + ModelicaSystem while the non-structural parameters can just be set on the executable. + + The return value is the number of simulation defined. + """ + + doe_sim = {} + doe_def = {} + + param_structure = {} + param_simple = {} + for param_name in self._parameters.keys(): + changeable = self._mod.isParameterChangeable(name=param_name) + logger.info(f"Parameter {repr(param_name)} is changeable? {changeable}") + + if changeable: + param_simple[param_name] = self._parameters[param_name] + else: + param_structure[param_name] = self._parameters[param_name] + + param_structure_combinations = list(itertools.product(*param_structure.values())) + param_simple_combinations = list(itertools.product(*param_simple.values())) + + for idx_pc_structure, pc_structure in enumerate(param_structure_combinations): + + build_dir = self._resultpath / f"DOE_{idx_pc_structure:09d}" + build_dir.mkdir() + self._mod.setWorkDirectory(customBuildDirectory=build_dir) + + sim_param_structure = {} + for idx_structure, pk_structure in enumerate(param_structure.keys()): + sim_param_structure[pk_structure] = pc_structure[idx_structure] + + pk_value = pc_structure[idx_structure] + if isinstance(pk_value, str): + expression = f"setParameterValue({self._model_name}, {pk_structure}, \"{pk_value}\")" + elif isinstance(pk_value, bool): + pk_value_bool_str = "true" if pk_value else "false" + expression = f"setParameterValue({self._model_name}, {pk_structure}, {pk_value_bool_str});" + else: + expression = f"setParameterValue({self._model_name}, {pk_structure}, {pk_value})" + res = self._mod.sendExpression(expression) + if not res: + raise ModelicaSystemError(f"Cannot set structural parameter {self._model_name}.{pk_structure} " + f"to {pk_value} using {repr(expression)}") + + self._mod.buildModel() + + for idx_pc_simple, pc_simple in enumerate(param_simple_combinations): + sim_param_simple = {} + for idx_simple, pk_simple in enumerate(param_simple.keys()): + sim_param_simple[pk_simple] = pc_simple[idx_simple] + + resfilename = f"DOE_{idx_pc_structure:09d}_{idx_pc_simple:09d}.mat" + logger.info(f"use result file {repr(resfilename)} " + f"for structural parameters: {sim_param_structure} " + f"and simple parameters: {sim_param_simple}") + resultfile = self._resultpath / resfilename + + df_data = ( + { + 'ID structure': idx_pc_structure, + } + | sim_param_structure + | { + 'ID non-structure': idx_pc_simple, + } + | sim_param_simple + | { + self.DICT_RESULT_AVAILABLE: False, + } + ) + + self._mod.setParameters(sim_param_simple) + mscmd = self._mod.simulate_cmd( + result_file=resultfile, + timeout=self._timeout, + ) + if self._simargs is not None: + mscmd.args_set(args=self._simargs) + cmd_definition = mscmd.definition() + del mscmd + + doe_sim[resfilename] = cmd_definition + doe_def[resfilename] = df_data + + logger.info(f"Prepared {len(doe_sim)} simulation definitions for the defined DoE.") + self._doe_cmd = doe_sim + self._doe_def = doe_def + + return len(doe_sim) + + def get_doe_definition(self) -> Optional[dict[str, dict[str, Any]]]: + """ + Get the defined DoE as a dict, where each key is the result filename and the value is a dict of simulation + settings including structural and non-structural parameters. + + The following code snippet can be used to convert the data to a pandas dataframe: + + ``` + import pandas as pd + + doe_dict = doe_mod.get_doe_definition() + doe_df = pd.DataFrame.from_dict(data=doe_dict, orient='index') + ``` + + """ + return self._doe_def + + def get_doe_command(self) -> Optional[dict[str, OMCSessionRunData]]: + """ + Get the definitions of simulations commands to run for this DoE. + """ + return self._doe_cmd + + def simulate( + self, + num_workers: int = 3, + ) -> bool: + """ + Simulate the DoE using the defined number of workers. + + Returns True if all simulations were done successfully, else False. + """ + + if self._doe_cmd is None or self._doe_def is None: + raise ModelicaSystemError("DoE preparation missing - call prepare() first!") + + doe_cmd_total = len(self._doe_cmd) + doe_def_total = len(self._doe_def) + + if doe_cmd_total != doe_def_total: + raise ModelicaSystemError(f"Mismatch between number simulation commands ({doe_cmd_total}) " + f"and simulation definitions ({doe_def_total}).") + + doe_task_query: queue.Queue = queue.Queue() + if self._doe_cmd is not None: + for doe_cmd in self._doe_cmd.values(): + doe_task_query.put(doe_cmd) + + if not isinstance(self._doe_def, dict) or len(self._doe_def) == 0: + raise ModelicaSystemError("Missing Doe Summary!") + + def worker(worker_id, task_queue): + while True: + try: + # Get the next task from the queue + cmd_definition = task_queue.get(block=False) + except queue.Empty: + logger.info(f"[Worker {worker_id}] No more simulations to run.") + break + + if cmd_definition is None: + raise ModelicaSystemError("Missing simulation definition!") + + resultfile = cmd_definition.cmd_result_path + resultpath = self._mod.get_session().omcpath(resultfile) + + logger.info(f"[Worker {worker_id}] Performing task: {resultpath.name}") + + try: + returncode = self._mod.get_session().run_model_executable(cmd_run_data=cmd_definition) + logger.info(f"[Worker {worker_id}] Simulation {resultpath.name} " + f"finished with return code: {returncode}") + except ModelicaSystemError as ex: + logger.warning(f"Simulation error for {resultpath.name}: {ex}") + + # Mark the task as done + task_queue.task_done() + + sim_query_done = doe_cmd_total - doe_task_query.qsize() + logger.info(f"[Worker {worker_id}] Task completed: {resultpath.name} " + f"({doe_cmd_total - sim_query_done}/{doe_cmd_total} = " + f"{(doe_cmd_total - sim_query_done) / doe_cmd_total * 100:.2f}% of tasks left)") + + # Create and start worker threads + logger.info(f"Start simulations for DoE with {doe_cmd_total} simulations " + f"using {num_workers} workers ...") + threads = [] + for i in range(num_workers): + thread = threading.Thread(target=worker, args=(i, doe_task_query)) + thread.start() + threads.append(thread) + + # Wait for all threads to complete + for thread in threads: + thread.join() + + doe_def_done = 0 + for resultfilename in self._doe_def: + resultfile = self._resultpath / resultfilename + + # include check for an empty (=> 0B) result file which indicates a crash of the model executable + # see: https://github.com/OpenModelica/OMPython/issues/261 + # https://github.com/OpenModelica/OpenModelica/issues/13829 + if resultfile.is_file() and resultfile.size() > 0: + self._doe_def[resultfilename][self.DICT_RESULT_AVAILABLE] = True + doe_def_done += 1 + + logger.info(f"All workers finished ({doe_def_done} of {doe_def_total} simulations with a result file).") + + return doe_def_total == doe_def_done + + def get_doe_solutions( + self, + var_list: Optional[list] = None, + ) -> Optional[tuple[str] | dict[str, dict[str, np.ndarray]]]: + """ + Get all solutions of the DoE run. The following return values are possible: + + * A list of variables if val_list == None + + * The Solutions as dict[str, pd.DataFrame] if a value list (== val_list) is defined. + + The following code snippet can be used to convert the solution data for each run to a pandas dataframe: + + ``` + import pandas as pd + + doe_sol = doe_mod.get_doe_solutions() + for key in doe_sol: + data = doe_sol[key]['data'] + if data: + doe_sol[key]['df'] = pd.DataFrame.from_dict(data=data) + else: + doe_sol[key]['df'] = None + ``` + + """ + if not isinstance(self._doe_def, dict): + return None + + if len(self._doe_def) == 0: + raise ModelicaSystemError("No result files available - all simulations did fail?") + + sol_dict: dict[str, dict[str, Any]] = {} + for resultfilename in self._doe_def: + resultfile = self._resultpath / resultfilename + + sol_dict[resultfilename] = {} + + if not self._doe_def[resultfilename][self.DICT_RESULT_AVAILABLE]: + msg = f"No result file available for {resultfilename}" + logger.warning(msg) + sol_dict[resultfilename]['msg'] = msg + sol_dict[resultfilename]['data'] = {} + continue + + if var_list is None: + var_list_row = list(self._mod.getSolutions(resultfile=resultfile.as_posix())) + else: + var_list_row = var_list + + try: + sol = self._mod.getSolutions(varList=var_list_row, resultfile=resultfile.as_posix()) + sol_data = {var: sol[idx] for idx, var in enumerate(var_list_row)} + sol_dict[resultfilename]['msg'] = 'Simulation available' + sol_dict[resultfilename]['data'] = sol_data + except ModelicaSystemError as ex: + msg = f"Error reading solution for {resultfilename}: {ex}" + logger.warning(msg) + sol_dict[resultfilename]['msg'] = msg + sol_dict[resultfilename]['data'] = {} + + return sol_dict diff --git a/OMPython/OMCSession.py b/OMPython/OMCSession.py index ac99dc05..df505f80 100644 --- a/OMPython/OMCSession.py +++ b/OMPython/OMCSession.py @@ -34,11 +34,14 @@ CONDITIONS OF OSMC-PL. """ +import abc +import dataclasses import io import json import logging import os import pathlib +import platform import psutil import pyparsing import re @@ -268,6 +271,232 @@ def getClassNames(self, className=None, recursive=False, qualified=False, sort=F return self._ask(question='getClassNames', opt=opt) +class OMCPathReal(pathlib.PurePosixPath): + """ + Implementation of a basic Path object which uses OMC as backend. The connection to OMC is provided via a + OMCSessionZMQ session object. + """ + + def __init__(self, *path, session: OMCSessionZMQ) -> None: + super().__init__(*path) + self._session = session + + def with_segments(self, *pathsegments): + """ + Create a new OMCPath object with the given path segments. + + The original definition of Path is overridden to ensure session is set. + """ + return type(self)(*pathsegments, session=self._session) + + def is_file(self, *, follow_symlinks=True) -> bool: + """ + Check if the path is a regular file. + """ + return self._session.sendExpression(f'regularFileExists("{self.as_posix()}")') + + def is_dir(self, *, follow_symlinks=True) -> bool: + """ + Check if the path is a directory. + """ + return self._session.sendExpression(f'directoryExists("{self.as_posix()}")') + + def read_text(self, encoding=None, errors=None, newline=None) -> str: + """ + Read the content of the file represented by this path as text. + + The additional arguments `encoding`, `errors` and `newline` are only defined for compatibility with Path() + definition. + """ + return self._session.sendExpression(f'readFile("{self.as_posix()}")') + + def write_text(self, data: str, encoding=None, errors=None, newline=None): + """ + Write text data to the file represented by this path. + + The additional arguments `encoding`, `errors`, and `newline` are only defined for compatibility with Path() + definitions. + """ + if not isinstance(data, str): + raise TypeError('data must be str, not %s' % + data.__class__.__name__) + + return self._session.sendExpression(f'writeFile("{self.as_posix()}", "{data}", false)') + + def mkdir(self, mode=0o777, parents=False, exist_ok=False): + """ + Create a directory at the path represented by this OMCPath object. + + The additional arguments `mode`, and `parents` are only defined for compatibility with Path() definitions. + """ + if self.is_dir() and not exist_ok: + raise FileExistsError(f"Directory {self.as_posix()} already exists!") + + return self._session.sendExpression(f'mkdir("{self.as_posix()}")') + + def cwd(self): + """ + Returns the current working directory as an OMCPath object. + """ + cwd_str = self._session.sendExpression('cd()') + return OMCPath(cwd_str, session=self._session) + + def unlink(self, missing_ok: bool = False) -> None: + """ + Unlink (delete) the file or directory represented by this path. + """ + res = self._session.sendExpression(f'deleteFile("{self.as_posix()}")') + if not res and not missing_ok: + raise FileNotFoundError(f"Cannot delete file {self.as_posix()} - it does not exists!") + + def resolve(self, strict: bool = False): + """ + Resolve the path to an absolute path. This is done based on available OMC functions. + """ + if strict and not (self.is_file() or self.is_dir()): + raise OMCSessionException(f"Path {self.as_posix()} does not exist!") + + if self.is_file(): + omcpath = self._omc_resolve(self.parent.as_posix()) / self.name + elif self.is_dir(): + omcpath = self._omc_resolve(self.as_posix()) + else: + raise OMCSessionException(f"Path {self.as_posix()} is neither a file nor a directory!") + + return omcpath + + def _omc_resolve(self, pathstr: str): + """ + Internal function to resolve the path of the OMCPath object using OMC functions *WITHOUT* changing the cwd + within OMC. + """ + expression = ('omcpath_cwd := cd(); ' + f'omcpath_check := cd("{pathstr}"); ' # check requested pathstring + 'cd(omcpath_cwd)') + + try: + result = self._session.sendExpression(command=expression, parsed=False) + result_parts = result.split('\n') + pathstr_resolved = result_parts[1] + pathstr_resolved = pathstr_resolved[1:-1] # remove quotes + + omcpath_resolved = self._session.omcpath(pathstr_resolved) + except OMCSessionException as ex: + raise OMCSessionException(f"OMCPath resolve failed for {pathstr}!") from ex + + if not omcpath_resolved.is_file() and not omcpath_resolved.is_dir(): + raise OMCSessionException(f"OMCPath resolve failed for {pathstr} - path does not exist!") + + return omcpath_resolved + + def absolute(self): + """ + Resolve the path to an absolute path. This is done by calling resolve() as it is the best we can do + using OMC functions. + """ + return self.resolve(strict=True) + + def exists(self, follow_symlinks=True) -> bool: + """ + Semi replacement for pathlib.Path.exists(). + """ + return self.is_file() or self.is_dir() + + def size(self) -> int: + """ + Get the size of the file in bytes - this is an extra function and the best we can do using OMC. + """ + if not self.is_file(): + raise OMCSessionException(f"Path {self.as_posix()} is not a file!") + + res = self._session.sendExpression(f'stat("{self.as_posix()}")') + if res[0]: + return int(res[1]) + + raise OMCSessionException(f"Error reading file size for path {self.as_posix()}!") + + +if sys.version_info < (3, 12): + + class OMCPathCompatibility(pathlib.Path): + """ + Compatibility class for OMCPath in Python < 3.12. This allows to run all code which uses OMCPath (mainly + ModelicaSystem) on these Python versions. There is one remaining limitation: only OMCProcessLocal will work as + OMCPathCompatibility is based on the standard pathlib.Path implementation. + """ + + # modified copy of pathlib.Path.__new__() definition + def __new__(cls, *args, **kwargs): + logger.warning("Python < 3.12 - using a version of class OMCPath " + "based on pathlib.Path for local usage only.") + + if cls is OMCPathCompatibility: + cls = OMCPathCompatibilityWindows if os.name == 'nt' else OMCPathCompatibilityPosix + self = cls._from_parts(args) + if not self._flavour.is_supported: + raise NotImplementedError("cannot instantiate %r on your system" + % (cls.__name__,)) + return self + + def size(self) -> int: + """ + Needed compatibility function to have the same interface as OMCPathReal + """ + return self.stat().st_size + + class OMCPathCompatibilityPosix(pathlib.PosixPath, OMCPathCompatibility): + pass + + class OMCPathCompatibilityWindows(pathlib.WindowsPath, OMCPathCompatibility): + pass + + OMCPath = OMCPathCompatibility + +else: + OMCPath = OMCPathReal + + +@dataclasses.dataclass +class OMCSessionRunData: + """ + Data class to store the command line data for running a model executable in the OMC environment. + + All data should be defined for the environment, where OMC is running (local, docker or WSL) + """ + # cmd_path is the expected working directory + cmd_path: str + cmd_model_name: str + # command line arguments for the model executable + cmd_args: list[str] + # result file with the simulation output + cmd_result_path: str + + # command prefix data (as list of strings); needed for docker or WSL + cmd_prefix: Optional[list[str]] = None + # cmd_model_executable is build out of cmd_path and cmd_model_name; this is mainly needed on Windows (add *.exe) + cmd_model_executable: Optional[str] = None + # additional library search path; this is mainly needed if OMCProcessLocal is run on Windows + cmd_library_path: Optional[str] = None + # command timeout + cmd_timeout: Optional[float] = 10.0 + + # working directory to be used on the *local* system + cmd_cwd_local: Optional[str] = None + + def get_cmd(self) -> list[str]: + """ + Get the command line to run the model executable in the environment defined by the OMCProcess definition. + """ + + if self.cmd_model_executable is None: + raise OMCSessionException("No model file defined for the model executable!") + + cmdl = [] if self.cmd_prefix is None else self.cmd_prefix + cmdl += [self.cmd_model_executable] + self.cmd_args + + return cmdl + + class OMCSessionZMQ: def __init__( @@ -322,6 +551,99 @@ def __del__(self): self.omc_zmq = None + def omcpath(self, *path) -> OMCPath: + """ + Create an OMCPath object based on the given path segments and the current OMC session. + """ + + # fallback solution for Python < 3.12; a modified pathlib.Path object is used as OMCPath replacement + if sys.version_info < (3, 12): + if isinstance(self.omc_process, OMCProcessLocal): + # noinspection PyArgumentList + return OMCPath(*path) + else: + raise OMCSessionException("OMCPath is supported for Python < 3.12 only if OMCProcessLocal is used!") + else: + return OMCPath(*path, session=self) + + def omcpath_tempdir(self, tempdir_base: Optional[OMCPath] = None) -> OMCPath: + """ + Get a temporary directory using OMC. It is our own implementation as non-local usage relies on OMC to run all + filesystem related access. + """ + names = [str(uuid.uuid4()) for _ in range(100)] + + if tempdir_base is None: + # fallback solution for Python < 3.12; a modified pathlib.Path object is used as OMCPath replacement + if sys.version_info < (3, 12): + tempdir_str = tempfile.gettempdir() + else: + tempdir_str = self.sendExpression("getTempDirectoryPath()") + tempdir_base = self.omcpath(tempdir_str) + + tempdir: Optional[OMCPath] = None + for name in names: + # create a unique temporary directory name + tempdir = tempdir_base / name + + if tempdir.exists(): + continue + + tempdir.mkdir(parents=True, exist_ok=False) + break + + if tempdir is None or not tempdir.is_dir(): + raise OMCSessionException("Cannot create a temporary directory!") + + return tempdir + + def omc_run_data_update(self, omc_run_data: OMCSessionRunData) -> OMCSessionRunData: + """ + Modify data based on the selected OMCProcess implementation. + + Needs to be implemented in the subclasses. + """ + return self.omc_process.omc_run_data_update(omc_run_data=omc_run_data) + + @staticmethod + def run_model_executable(cmd_run_data: OMCSessionRunData) -> int: + """ + Run the command defined in cmd_run_data. This class is defined as static method such that there is no need to + keep instances of over classes around. + """ + + my_env = os.environ.copy() + if isinstance(cmd_run_data.cmd_library_path, str): + my_env["PATH"] = cmd_run_data.cmd_library_path + os.pathsep + my_env["PATH"] + + cmdl = cmd_run_data.get_cmd() + + logger.debug("Run OM command %s in %s", repr(cmdl), cmd_run_data.cmd_path) + try: + cmdres = subprocess.run( + cmdl, + capture_output=True, + text=True, + env=my_env, + cwd=cmd_run_data.cmd_cwd_local, + timeout=cmd_run_data.cmd_timeout, + check=True, + ) + stdout = cmdres.stdout.strip() + stderr = cmdres.stderr.strip() + returncode = cmdres.returncode + + logger.debug("OM output for command %s:\n%s", repr(cmdl), stdout) + + if stderr: + raise OMCSessionException(f"Error running model executable {repr(cmdl)}: {stderr}") + except subprocess.TimeoutExpired as ex: + raise OMCSessionException(f"Timeout running model executable {repr(cmdl)}") from ex + except subprocess.CalledProcessError as ex: + raise OMCSessionException(f"Error running model executable {repr(cmdl)}") from ex + + return returncode + def execute(self, command: str): warnings.warn("This function is depreciated and will be removed in future versions; " "please use sendExpression() instead", DeprecationWarning, stacklevel=2) @@ -329,8 +651,11 @@ def execute(self, command: str): return self.sendExpression(command, parsed=False) def sendExpression(self, command: str, parsed: bool = True) -> Any: + """ + Send an expression to the OMC server and return the result. + """ if self.omc_zmq is None: - raise OMCSessionException("No OMC running. Create a new instance of OMCSessionZMQ!") + raise OMCSessionException("No OMC running. Create a new instance of OMCProcess!") logger.debug("sendExpression(%r, parsed=%r)", command, parsed) @@ -425,7 +750,7 @@ def sendExpression(self, command: str, parsed: bool = True) -> Any: raise OMCSessionException("Cannot parse OMC result") from ex -class OMCProcess: +class OMCProcess(metaclass=abc.ABCMeta): def __init__( self, @@ -507,6 +832,15 @@ def _get_portfile_path(self) -> Optional[pathlib.Path]: return portfile_path + @abc.abstractmethod + def omc_run_data_update(self, omc_run_data: OMCSessionRunData) -> OMCSessionRunData: + """ + Update the OMCSessionRunData object based on the selected OMCProcess implementation. + + Needs to be implemented in the subclasses. + """ + raise NotImplementedError("This method must be implemented in subclasses!") + class OMCProcessPort(OMCProcess): @@ -517,13 +851,19 @@ def __init__( super().__init__() self._omc_port = omc_port + def omc_run_data_update(self, omc_run_data: OMCSessionRunData) -> OMCSessionRunData: + """ + Update the OMCSessionRunData object based on the selected OMCProcess implementation. + """ + raise OMCSessionException("OMCProcessPort does not support omc_run_data_update()!") + class OMCProcessLocal(OMCProcess): def __init__( self, timeout: float = 10.00, - omhome: Optional[str] = None, + omhome: Optional[str | os.PathLike] = None, ) -> None: super().__init__(timeout=timeout) @@ -536,7 +876,7 @@ def __init__( self._omc_port = self._omc_port_get() @staticmethod - def _omc_home_get(omhome: Optional[str] = None) -> pathlib.Path: + def _omc_home_get(omhome: Optional[str | os.PathLike] = None) -> pathlib.Path: # use the provided path if omhome is not None: return pathlib.Path(omhome) @@ -598,6 +938,48 @@ def _omc_port_get(self) -> str: return port + def omc_run_data_update(self, omc_run_data: OMCSessionRunData) -> OMCSessionRunData: + """ + Update the OMCSessionRunData object based on the selected OMCProcess implementation. + """ + # create a copy of the data + omc_run_data_copy = dataclasses.replace(omc_run_data) + + # as this is the local implementation, pathlib.Path can be used + cmd_path = pathlib.Path(omc_run_data_copy.cmd_path) + + if platform.system() == "Windows": + path_dll = "" + + # set the process environment from the generated .bat file in windows which should have all the dependencies + path_bat = cmd_path / f"{omc_run_data.cmd_model_name}.bat" + if not path_bat.is_file(): + raise OMCSessionException("Batch file (*.bat) does not exist " + str(path_bat)) + + content = path_bat.read_text(encoding='utf-8') + for line in content.splitlines(): + match = re.match(r"^SET PATH=([^%]*)", line, re.IGNORECASE) + if match: + path_dll = match.group(1).strip(';') # Remove any trailing semicolons + my_env = os.environ.copy() + my_env["PATH"] = path_dll + os.pathsep + my_env["PATH"] + + omc_run_data_copy.cmd_library_path = path_dll + + cmd_model_executable = cmd_path / f"{omc_run_data_copy.cmd_model_name}.exe" + else: + # for Linux the paths to the needed libraries should be included in the executable (using rpath) + cmd_model_executable = cmd_path / omc_run_data_copy.cmd_model_name + + if not cmd_model_executable.is_file(): + raise OMCSessionException(f"Application file path not found: {cmd_model_executable}") + omc_run_data_copy.cmd_model_executable = cmd_model_executable.as_posix() + + # define local(!) working directory + omc_run_data_copy.cmd_cwd_local = omc_run_data.cmd_path + + return omc_run_data_copy + class OMCProcessDockerHelper(OMCProcess): @@ -605,7 +987,7 @@ def __init__( self, timeout: float = 10.00, dockerExtraArgs: Optional[list] = None, - dockerOpenModelicaPath: str = "omc", + dockerOpenModelicaPath: str | os.PathLike = "omc", dockerNetwork: Optional[str] = None, port: Optional[int] = None, ) -> None: @@ -615,7 +997,7 @@ def __init__( dockerExtraArgs = [] self._dockerExtraArgs = dockerExtraArgs - self._dockerOpenModelicaPath = dockerOpenModelicaPath + self._dockerOpenModelicaPath = pathlib.PurePosixPath(dockerOpenModelicaPath) self._dockerNetwork = dockerNetwork self._interactivePort = port @@ -704,6 +1086,28 @@ def get_docker_container_id(self) -> str: return self._dockerCid + def omc_run_data_update(self, omc_run_data: OMCSessionRunData) -> OMCSessionRunData: + """ + Update the OMCSessionRunData object based on the selected OMCProcess implementation. + """ + omc_run_data_copy = dataclasses.replace(omc_run_data) + + omc_run_data_copy.cmd_prefix = ( + [ + "docker", "exec", + "--user", str(self._getuid()), + "--workdir", omc_run_data_copy.cmd_path, + ] + + self._dockerExtraArgs + + [self._dockerCid] + ) + + cmd_path = pathlib.PurePosixPath(omc_run_data_copy.cmd_path) + cmd_model_executable = cmd_path / omc_run_data_copy.cmd_model_name + omc_run_data_copy.cmd_model_executable = cmd_model_executable.as_posix() + + return omc_run_data_copy + class OMCProcessDocker(OMCProcessDockerHelper): @@ -712,7 +1116,7 @@ def __init__( timeout: float = 10.00, docker: Optional[str] = None, dockerExtraArgs: Optional[list] = None, - dockerOpenModelicaPath: str = "omc", + dockerOpenModelicaPath: str | os.PathLike = "omc", dockerNetwork: Optional[str] = None, port: Optional[int] = None, ) -> None: @@ -796,7 +1200,7 @@ def _docker_omc_cmd( ] + self._dockerExtraArgs + dockerNetworkStr - + [self._docker, self._dockerOpenModelicaPath] + + [self._docker, self._dockerOpenModelicaPath.as_posix()] + omc_path_and_args_list + extraFlags) @@ -853,7 +1257,7 @@ def __init__( timeout: float = 10.00, dockerContainer: Optional[str] = None, dockerExtraArgs: Optional[list] = None, - dockerOpenModelicaPath: str = "omc", + dockerOpenModelicaPath: str | os.PathLike = "omc", dockerNetwork: Optional[str] = None, port: Optional[int] = None, ) -> None: @@ -904,7 +1308,7 @@ def _docker_omc_cmd(self, omc_path_and_args_list) -> list: "--user", str(self._getuid()), ] + self._dockerExtraArgs - + [self._dockerCid, self._dockerOpenModelicaPath] + + [self._dockerCid, self._dockerOpenModelicaPath.as_posix()] + omc_path_and_args_list + extraFlags) @@ -947,25 +1351,33 @@ def __init__( super().__init__(timeout=timeout) - # get wsl base command - self._wsl_cmd = ['wsl'] - if isinstance(wsl_distribution, str): - self._wsl_cmd += ['--distribution', wsl_distribution] - if isinstance(wsl_user, str): - self._wsl_cmd += ['--user', wsl_user] - self._wsl_cmd += ['--'] - # where to find OpenModelica self._wsl_omc = wsl_omc + # store WSL distribution and user + self._wsl_distribution = wsl_distribution + self._wsl_user = wsl_user # start up omc executable, which is waiting for the ZMQ connection self._omc_process = self._omc_process_get() # connect to the running omc instance using ZMQ self._omc_port = self._omc_port_get() + def _wsl_cmd(self, wsl_cwd: Optional[str] = None) -> list[str]: + # get wsl base command + wsl_cmd = ['wsl'] + if isinstance(self._wsl_distribution, str): + wsl_cmd += ['--distribution', self._wsl_distribution] + if isinstance(self._wsl_user, str): + wsl_cmd += ['--user', self._wsl_user] + if isinstance(wsl_cwd, str): + wsl_cmd += ['--cd', wsl_cwd] + wsl_cmd += ['--'] + + return wsl_cmd + def _omc_process_get(self) -> subprocess.Popen: my_env = os.environ.copy() - omc_command = self._wsl_cmd + [ + omc_command = self._wsl_cmd() + [ self._wsl_omc, "--locale=C", "--interactive=zmq", @@ -988,7 +1400,7 @@ def _omc_port_get(self) -> str: omc_portfile_path = self._get_portfile_path() if omc_portfile_path is not None: output = subprocess.check_output( - args=self._wsl_cmd + ["cat", omc_portfile_path.as_posix()], + args=self._wsl_cmd() + ["cat", omc_portfile_path.as_posix()], stderr=subprocess.DEVNULL, ) port = output.decode().strip() @@ -1009,3 +1421,17 @@ def _omc_port_get(self) -> str: f"pid={self._omc_process.pid if isinstance(self._omc_process, subprocess.Popen) else '?'}") return port + + def omc_run_data_update(self, omc_run_data: OMCSessionRunData) -> OMCSessionRunData: + """ + Update the OMCSessionRunData object based on the selected OMCProcess implementation. + """ + omc_run_data_copy = dataclasses.replace(omc_run_data) + + omc_run_data_copy.cmd_prefix = self._wsl_cmd(wsl_cwd=omc_run_data.cmd_path) + + cmd_path = pathlib.PurePosixPath(omc_run_data_copy.cmd_path) + cmd_model_executable = cmd_path / omc_run_data_copy.cmd_model_name + omc_run_data_copy.cmd_model_executable = cmd_model_executable.as_posix() + + return omc_run_data_copy diff --git a/OMPython/__init__.py b/OMPython/__init__.py index 1da0a0a3..649b3e60 100644 --- a/OMPython/__init__.py +++ b/OMPython/__init__.py @@ -36,8 +36,9 @@ CONDITIONS OF OSMC-PL. """ -from OMPython.ModelicaSystem import LinearizationResult, ModelicaSystem, ModelicaSystemCmd, ModelicaSystemError -from OMPython.OMCSession import (OMCSessionCmd, OMCSessionException, OMCSessionZMQ, +from OMPython.ModelicaSystem import (LinearizationResult, ModelicaSystem, ModelicaSystemCmd, ModelicaSystemDoE, + ModelicaSystemError) +from OMPython.OMCSession import (OMCSessionCmd, OMCSessionException, OMCSessionRunData, OMCSessionZMQ, OMCProcessPort, OMCProcessLocal, OMCProcessDocker, OMCProcessDockerContainer, OMCProcessWSL) @@ -46,10 +47,12 @@ 'LinearizationResult', 'ModelicaSystem', 'ModelicaSystemCmd', + 'ModelicaSystemDoE', 'ModelicaSystemError', 'OMCSessionCmd', 'OMCSessionException', + 'OMCSessionRunData', 'OMCSessionZMQ', 'OMCProcessPort', 'OMCProcessLocal', diff --git a/tests/test_FMIExport.py b/tests/test_FMIExport.py index f47b87ae..b8305b31 100644 --- a/tests/test_FMIExport.py +++ b/tests/test_FMIExport.py @@ -1,12 +1,13 @@ import OMPython import shutil import os +import pathlib def test_CauerLowPassAnalog(): mod = OMPython.ModelicaSystem(modelName="Modelica.Electrical.Analog.Examples.CauerLowPassAnalog", lmodel=["Modelica"]) - tmp = mod.getWorkDirectory() + tmp = pathlib.Path(mod.getWorkDirectory()) try: fmu = mod.convertMo2Fmu(fileNamePrefix="CauerLowPassAnalog") assert os.path.exists(fmu) @@ -16,7 +17,7 @@ def test_CauerLowPassAnalog(): def test_DrumBoiler(): mod = OMPython.ModelicaSystem(modelName="Modelica.Fluid.Examples.DrumBoiler.DrumBoiler", lmodel=["Modelica"]) - tmp = mod.getWorkDirectory() + tmp = pathlib.Path(mod.getWorkDirectory()) try: fmu = mod.convertMo2Fmu(fileNamePrefix="DrumBoiler") assert os.path.exists(fmu) diff --git a/tests/test_ModelicaSystem.py b/tests/test_ModelicaSystem.py index 8e9b8a8e..4b4f8c51 100644 --- a/tests/test_ModelicaSystem.py +++ b/tests/test_ModelicaSystem.py @@ -2,20 +2,36 @@ import os import pathlib import pytest +import sys import tempfile import numpy as np +skip_on_windows = pytest.mark.skipif( + sys.platform.startswith("win"), + reason="OpenModelica Docker image is Linux-only; skipping on Windows.", +) + +skip_python_older_312 = pytest.mark.skipif( + sys.version_info < (3, 12), + reason="OMCPath(non-local) only working for Python >= 3.12.", +) + @pytest.fixture -def model_firstorder(tmp_path): - mod = tmp_path / "M.mo" - mod.write_text("""model M +def model_firstorder_content(): + return ("""model M Real x(start = 1, fixed = true); parameter Real a = -1; equation der(x) = x*a; end M; """) + + +@pytest.fixture +def model_firstorder(tmp_path, model_firstorder_content): + mod = tmp_path / "M.mo" + mod.write_text(model_firstorder_content) return mod @@ -105,16 +121,40 @@ def test_customBuildDirectory(tmp_path, model_firstorder): tmpdir = tmp_path / "tmpdir1" tmpdir.mkdir() m = OMPython.ModelicaSystem(filePath, "M", customBuildDirectory=tmpdir) - assert m.getWorkDirectory().resolve() == tmpdir.resolve() + assert pathlib.Path(m.getWorkDirectory()).resolve() == tmpdir.resolve() result_file = tmpdir / "a.mat" assert not result_file.exists() m.simulate(resultfile="a.mat") assert result_file.is_file() +@skip_on_windows +@skip_python_older_312 +def test_getSolutions_docker(model_firstorder_content): + omcp = OMPython.OMCProcessDocker(docker="openmodelica/openmodelica:v1.25.0-minimal") + omc = OMPython.OMCSessionZMQ(omc_process=omcp) + + modelpath = omc.omcpath_tempdir() / 'M.mo' + modelpath.write_text(model_firstorder_content) + + file_path = pathlib.Path(modelpath) + mod = OMPython.ModelicaSystem( + fileName=file_path, + modelName="M", + omc_process=omc.omc_process, + ) + + _run_getSolutions(mod) + + def test_getSolutions(model_firstorder): filePath = model_firstorder.as_posix() mod = OMPython.ModelicaSystem(filePath, "M") + + _run_getSolutions(mod) + + +def _run_getSolutions(mod): x0 = 1 a = -1 tau = -1 / a diff --git a/tests/test_ModelicaSystemCmd.py b/tests/test_ModelicaSystemCmd.py index 3b28699c..88e4fc29 100644 --- a/tests/test_ModelicaSystemCmd.py +++ b/tests/test_ModelicaSystemCmd.py @@ -18,7 +18,11 @@ def model_firstorder(tmp_path): @pytest.fixture def mscmd_firstorder(model_firstorder): mod = OMPython.ModelicaSystem(fileName=model_firstorder.as_posix(), modelName="M") - mscmd = OMPython.ModelicaSystemCmd(runpath=mod.getWorkDirectory(), modelname=mod._model_name) + mscmd = OMPython.ModelicaSystemCmd( + session=mod.get_session(), + runpath=mod.getWorkDirectory(), + modelname=mod._model_name, + ) return mscmd @@ -32,8 +36,7 @@ def test_simflags(mscmd_firstorder): with pytest.deprecated_call(): mscmd.args_set(args=mscmd.parse_simflags(simflags="-noEventEmit -noRestart -override=a=1,x=3")) - assert mscmd.get_cmd() == [ - mscmd.get_exe().as_posix(), + assert mscmd.get_cmd_args() == [ '-noEventEmit', '-override=b=2,a=1,x=3', '-noRestart', diff --git a/tests/test_ModelicaSystemDoE.py b/tests/test_ModelicaSystemDoE.py new file mode 100644 index 00000000..a5ab37e1 --- /dev/null +++ b/tests/test_ModelicaSystemDoE.py @@ -0,0 +1,148 @@ +import numpy as np +import OMPython +import pathlib +import pytest +import sys + +skip_on_windows = pytest.mark.skipif( + sys.platform.startswith("win"), + reason="OpenModelica Docker image is Linux-only; skipping on Windows.", +) + +skip_python_older_312 = pytest.mark.skipif( + sys.version_info < (3, 12), + reason="OMCPath(non-local) only working for Python >= 3.12.", +) + + +@pytest.fixture +def model_doe(tmp_path: pathlib.Path) -> pathlib.Path: + # see: https://trac.openmodelica.org/OpenModelica/ticket/4052 + mod = tmp_path / "M.mo" + # TODO: update for bool and string parameters; check if these can be used in DoE + mod.write_text(""" +model M + parameter Integer p=1; + parameter Integer q=1; + parameter Real a = -1; + parameter Real b = -1; + Real x[p]; + Real y[q]; +equation + der(x) = a * fill(1.0, p); + der(y) = b * fill(1.0, q); +end M; +""") + return mod + + +@pytest.fixture +def param_doe() -> dict[str, list]: + param = { + # structural + 'p': [1, 2], + 'q': [3, 4], + # simple + 'a': [5, 6], + 'b': [7, 8], + } + return param + + +def test_ModelicaSystemDoE_local(tmp_path, model_doe, param_doe): + tmpdir = tmp_path / 'DoE' + tmpdir.mkdir(exist_ok=True) + + doe_mod = OMPython.ModelicaSystemDoE( + fileName=model_doe.as_posix(), + modelName="M", + parameters=param_doe, + resultpath=tmpdir, + simargs={"override": {'stopTime': 1.0}}, + ) + + _run_ModelicaSystemDoe(doe_mod=doe_mod) + + +@skip_on_windows +@skip_python_older_312 +def test_ModelicaSystemDoE_docker(tmp_path, model_doe, param_doe): + omcp = OMPython.OMCProcessDocker(docker="openmodelica/openmodelica:v1.25.0-minimal") + omc = OMPython.OMCSessionZMQ(omc_process=omcp) + assert omc.sendExpression("getVersion()") == "OpenModelica 1.25.0" + + modelpath = omc.omcpath_tempdir() / 'M.mo' + modelpath.write_text(model_doe.read_text()) + + doe_mod = OMPython.ModelicaSystemDoE( + fileName=modelpath.as_posix(), + modelName="M", + parameters=param_doe, + omc_process=omcp, + resultpath=modelpath.parent, + simargs={"override": {'stopTime': 1.0}}, + ) + + _run_ModelicaSystemDoe(doe_mod=doe_mod) + + +@pytest.mark.skip(reason="Not able to run WSL on github") +@skip_python_older_312 +def test_ModelicaSystemDoE_WSL(tmp_path, model_doe, param_doe): + tmpdir = tmp_path / 'DoE' + tmpdir.mkdir(exist_ok=True) + + doe_mod = OMPython.ModelicaSystemDoE( + fileName=model_doe.as_posix(), + modelName="M", + parameters=param_doe, + resultpath=tmpdir, + simargs={"override": {'stopTime': 1.0}}, + ) + + _run_ModelicaSystemDoe(doe_mod=doe_mod) + + +def _run_ModelicaSystemDoe(doe_mod): + doe_count = doe_mod.prepare() + assert doe_count == 16 + + doe_def = doe_mod.get_doe_definition() + assert isinstance(doe_def, dict) + assert len(doe_def.keys()) == doe_count + + doe_cmd = doe_mod.get_doe_command() + assert isinstance(doe_cmd, dict) + assert len(doe_cmd.keys()) == doe_count + + doe_status = doe_mod.simulate() + assert doe_status is True + + doe_sol = doe_mod.get_doe_solutions() + assert isinstance(doe_sol, dict) + assert len(doe_sol.keys()) == doe_count + + assert sorted(doe_def.keys()) == sorted(doe_cmd.keys()) + assert sorted(doe_cmd.keys()) == sorted(doe_sol.keys()) + + for resultfilename in doe_def: + row = doe_def[resultfilename] + + assert resultfilename in doe_sol + sol = doe_sol[resultfilename] + + var_dict = { + # simple / non-structural parameters + 'a': float(row['a']), + 'b': float(row['b']), + # structural parameters + 'p': float(row['p']), + 'q': float(row['q']), + # variables using the structural parameters + f"x[{row['p']}]": float(row['a']), + f"y[{row['p']}]": float(row['b']), + } + + for var in var_dict: + assert var in sol['data'] + assert np.isclose(sol['data'][var][-1], var_dict[var]) diff --git a/tests/test_OMCPath.py b/tests/test_OMCPath.py new file mode 100644 index 00000000..b8e937f3 --- /dev/null +++ b/tests/test_OMCPath.py @@ -0,0 +1,78 @@ +import sys +import OMPython +import pytest + +skip_on_windows = pytest.mark.skipif( + sys.platform.startswith("win"), + reason="OpenModelica Docker image is Linux-only; skipping on Windows.", +) + +skip_python_older_312 = pytest.mark.skipif( + sys.version_info < (3, 12), + reason="OMCPath(non-local) only working for Python >= 3.12.", +) + + +def test_OMCPath_OMCSessionZMQ(): + om = OMPython.OMCSessionZMQ() + + _run_OMCPath_checks(om) + + del om + + +def test_OMCPath_OMCProcessLocal(): + omp = OMPython.OMCProcessLocal() + om = OMPython.OMCSessionZMQ(omc_process=omp) + + _run_OMCPath_checks(om) + + del om + + +@skip_on_windows +@skip_python_older_312 +def test_OMCPath_OMCProcessDocker(): + omcp = OMPython.OMCProcessDocker(docker="openmodelica/openmodelica:v1.25.0-minimal") + om = OMPython.OMCSessionZMQ(omc_process=omcp) + assert om.sendExpression("getVersion()") == "OpenModelica 1.25.0" + + _run_OMCPath_checks(om) + + del omcp + del om + + +@pytest.mark.skip(reason="Not able to run WSL on github") +@skip_python_older_312 +def test_OMCPath_OMCProcessWSL(): + omcp = OMPython.OMCProcessWSL( + wsl_omc='omc', + wsl_user='omc', + timeout=30.0, + ) + om = OMPython.OMCSessionZMQ(omc_process=omcp) + + _run_OMCPath_checks(om) + + del omcp + del om + + +def _run_OMCPath_checks(om: OMPython.OMCSessionZMQ): + p1 = om.omcpath_tempdir() + p2 = p1 / 'test' + p2.mkdir() + assert p2.is_dir() + p3 = p2 / '..' / p2.name / 'test.txt' + assert p3.is_file() is False + assert p3.write_text('test') + assert p3.is_file() + assert p3.size() > 0 + p3 = p3.resolve().absolute() + assert str(p3) == str((p2 / 'test.txt').resolve().absolute()) + assert p3.read_text() == "test" + assert p3.is_file() + assert p3.parent.is_dir() + p3.unlink() + assert p3.is_file() is False diff --git a/tests/test_OMSessionCmd.py b/tests/test_OMSessionCmd.py index 1588fac8..15ed1d49 100644 --- a/tests/test_OMSessionCmd.py +++ b/tests/test_OMSessionCmd.py @@ -10,7 +10,7 @@ def test_isPackage(): def test_isPackage2(): mod = OMPython.ModelicaSystem(modelName="Modelica.Electrical.Analog.Examples.CauerLowPassAnalog", lmodel=["Modelica"]) - omccmd = OMPython.OMCSessionCmd(session=mod._getconn) + omccmd = OMPython.OMCSessionCmd(session=mod.get_session()) assert omccmd.isPackage('Modelica') diff --git a/tests/test_optimization.py b/tests/test_optimization.py index b4164397..6c6cae60 100644 --- a/tests/test_optimization.py +++ b/tests/test_optimization.py @@ -47,7 +47,9 @@ def test_optimization_example(tmp_path): r = mod.optimize() # it is necessary to specify resultfile, otherwise it wouldn't find it. - time, f, v = mod.getSolutions(["time", "f", "v"], resultfile=r["resultFile"]) + resultfile_str = r["resultFile"] + resultfile_omcpath = mod.get_session().omcpath(resultfile_str) + time, f, v = mod.getSolutions(["time", "f", "v"], resultfile=resultfile_omcpath.as_posix()) assert np.isclose(f[0], 10) assert np.isclose(f[-1], -10)