diff --git a/OMPython/ModelicaSystem.py b/OMPython/ModelicaSystem.py index ae480dde..17652828 100644 --- a/OMPython/ModelicaSystem.py +++ b/OMPython/ModelicaSystem.py @@ -33,23 +33,20 @@ """ import ast -import csv from dataclasses import dataclass import logging import numbers import numpy as np import os -import pathlib import platform import re import subprocess -import tempfile import textwrap from typing import Optional, Any import warnings import xml.etree.ElementTree as ET -from OMPython.OMCSession import OMCSessionException, OMCSessionZMQ, OMCProcessLocal +from OMPython.OMCSession import OMCSessionException, OMCSessionZMQ, OMCProcessLocal, OMCPath # define logger using the current module name as ID logger = logging.getLogger(__name__) @@ -115,8 +112,8 @@ def __getitem__(self, index: int): class ModelicaSystemCmd: """A compiled model executable.""" - def __init__(self, runpath: pathlib.Path, modelname: str, timeout: Optional[float] = None) -> None: - self._runpath = pathlib.Path(runpath).resolve().absolute() + def __init__(self, runpath: OMCPath, modelname: str, timeout: Optional[float] = None) -> None: + self._runpath = runpath self._model_name = modelname self._timeout = timeout self._args: dict[str, str | None] = {} @@ -176,7 +173,7 @@ def args_set(self, args: dict[str, Optional[str | dict[str, str]]]) -> None: for arg in args: self.arg_set(key=arg, val=args[arg]) - def get_exe(self) -> pathlib.Path: + def get_exe(self) -> OMCPath: """Get the path to the compiled model executable.""" if platform.system() == "Windows": path_exe = self._runpath / f"{self._model_name}.exe" @@ -296,12 +293,12 @@ def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, str]]]: class ModelicaSystem: def __init__( self, - fileName: Optional[str | os.PathLike | pathlib.Path] = None, + fileName: Optional[str | os.PathLike] = None, modelName: Optional[str] = None, lmodel: Optional[list[str | tuple[str, str]]] = None, commandLineOptions: Optional[str] = None, variableFilter: Optional[str] = None, - customBuildDirectory: Optional[str | os.PathLike | pathlib.Path] = None, + customBuildDirectory: Optional[str | os.PathLike] = None, omhome: Optional[str] = None, omc_process: Optional[OMCProcessLocal] = None, build: bool = True, @@ -383,12 +380,15 @@ def __init__( if not isinstance(lmodel, list): raise ModelicaSystemError(f"Invalid input type for lmodel: {type(lmodel)} - list expected!") - self._xml_file = None self._lmodel = lmodel # may be needed if model is derived from other model self._model_name = modelName # Model class name - self._file_name = pathlib.Path(fileName).resolve() if fileName is not None else None # Model file/package name + if fileName is not None: + file_name = self._getconn.omcpath(fileName).resolve() + else: + file_name = None + self._file_name: Optional[OMCPath] = file_name # Model file/package name self._simulated = False # True if the model has already been simulated - self._result_file: Optional[pathlib.Path] = None # for storing result file + self._result_file: Optional[OMCPath] = None # for storing result file self._variable_filter = variableFilter if self._file_name is not None and not self._file_name.is_file(): # if file does not exist @@ -400,7 +400,7 @@ def __init__( self.setCommandLineOptions("--linearizationDumpLanguage=python") self.setCommandLineOptions("--generateSymbolicLinearization") - self._tempdir = self.setTempDirectory(customBuildDirectory) + self._work_dir: OMCPath = self.setWorkDirectory(customBuildDirectory) if self._file_name is not None: self._loadLibrary(lmodel=self._lmodel) @@ -420,7 +420,7 @@ def setCommandLineOptions(self, commandLineOptions: Optional[str] = None): exp = f'setCommandLineOptions("{commandLineOptions}")' self.sendExpression(exp) - def _loadFile(self, fileName: pathlib.Path): + def _loadFile(self, fileName: OMCPath): # load file self.sendExpression(f'loadFile("{fileName.as_posix()}")') @@ -448,25 +448,34 @@ def _loadLibrary(self, lmodel: list): '1)["Modelica"]\n' '2)[("Modelica","3.2.3"), "PowerSystems"]\n') - def setTempDirectory(self, customBuildDirectory: Optional[str | os.PathLike | pathlib.Path] = None) -> pathlib.Path: - # create a unique temp directory for each session and build the model in that directory + def setWorkDirectory(self, customBuildDirectory: Optional[str | os.PathLike] = None) -> OMCPath: + """ + Define the work directory for the ModelicaSystem / OpenModelica session. The model is build within this + directory. If no directory is defined a unique temporary directory is created. + """ if customBuildDirectory is not None: - if not os.path.exists(customBuildDirectory): - raise IOError(f"{customBuildDirectory} does not exist") - tempdir = pathlib.Path(customBuildDirectory).absolute() + workdir = self._getconn.omcpath(customBuildDirectory).absolute() + if not workdir.is_dir(): + raise IOError(f"Provided work directory does not exists: {customBuildDirectory}!") else: - tempdir = pathlib.Path(tempfile.mkdtemp()).absolute() - if not tempdir.is_dir(): - raise IOError(f"{tempdir} could not be created") + workdir = self._getconn.omcpath_tempdir().absolute() + if not workdir.is_dir(): + raise IOError(f"{workdir} could not be created") - logger.info("Define tempdir as %s", tempdir) - exp = f'cd("{tempdir.as_posix()}")' + logger.info("Define work dir as %s", workdir) + exp = f'cd("{workdir.as_posix()}")' self.sendExpression(exp) - return tempdir + # set the class variable _tempdir ... + self._work_dir = workdir + # ... and also return the defined path + return workdir - def getWorkDirectory(self) -> pathlib.Path: - return self._tempdir + def getWorkDirectory(self) -> OMCPath: + """ + Return the defined working directory for this ModelicaSystem / OpenModelica session. + """ + return self._work_dir def buildModel(self, variableFilter: Optional[str] = None): if variableFilter is not None: @@ -480,8 +489,8 @@ def buildModel(self, variableFilter: Optional[str] = None): buildModelResult = self._requestApi("buildModel", self._model_name, properties=varFilter) logger.debug("OM model build result: %s", buildModelResult) - self._xml_file = pathlib.Path(buildModelResult[0]).parent / buildModelResult[1] - self._xmlparse() + xml_file = self._getconn.omcpath(buildModelResult[0]).parent / buildModelResult[1] + self._xmlparse(xml_file=xml_file) def sendExpression(self, expr: str, parsed: bool = True): try: @@ -507,23 +516,34 @@ def _requestApi(self, apiName, entity=None, properties=None): # 2 return self.sendExpression(exp) - def _xmlparse(self): - if not self._xml_file.is_file(): - raise ModelicaSystemError(f"XML file not generated: {self._xml_file}") + def _xmlparse(self, xml_file: OMCPath): + if not xml_file.is_file(): + raise ModelicaSystemError(f"XML file not generated: {xml_file}") - tree = ET.parse(self._xml_file) + xml_content = xml_file.read_text() + tree = ET.ElementTree(ET.fromstring(xml_content)) rootCQ = tree.getroot() for attr in rootCQ.iter('DefaultExperiment'): for key in ("startTime", "stopTime", "stepSize", "tolerance", "solver", "outputFormat"): - self._simulate_options[key] = attr.get(key) + self._simulate_options[key] = str(attr.get(key)) for sv in rootCQ.iter('ScalarVariable'): - scalar = {} - for key in ("name", "description", "variability", "causality", "alias"): - scalar[key] = sv.get(key) - scalar["changeable"] = sv.get('isValueChangeable') - scalar["aliasvariable"] = sv.get('aliasVariable') + translations = { + "alias": "alias", + "aliasvariable": "aliasVariable", + "causality": "causality", + "changeable": "isValueChangeable", + "description": "description", + "name": "name", + "variability": "variability", + } + + scalar: dict[str, Any] = {} + for key_dst, key_src in translations.items(): + val = sv.get(key_src) + scalar[key_dst] = None if val is None else str(val) + ch = list(sv) for att in ch: scalar["start"] = att.get('start') @@ -531,6 +551,7 @@ def _xmlparse(self): scalar["max"] = att.get('max') scalar["unit"] = att.get('unit') + # save parameters in the corresponding class variables if scalar["variability"] == "parameter": if scalar["name"] in self._override_variables: self._params[scalar["name"]] = self._override_variables[scalar["name"]] @@ -915,7 +936,7 @@ def getOptimizationOptions(self, names: Optional[str | list[str]] = None) -> dic def simulate_cmd( self, - result_file: pathlib.Path, + result_file: OMCPath, simflags: Optional[str] = None, simargs: Optional[dict[str, Optional[str | dict[str, str]]]] = None, timeout: Optional[float] = None, @@ -942,7 +963,11 @@ def simulate_cmd( An instance if ModelicaSystemCmd to run the requested simulation. """ - om_cmd = ModelicaSystemCmd(runpath=self._tempdir, modelname=self._model_name, timeout=timeout) + om_cmd = ModelicaSystemCmd( + runpath=self.getWorkDirectory(), + modelname=self._model_name, + timeout=timeout, + ) # always define the result file to use om_cmd.arg_set(key="r", val=result_file.as_posix()) @@ -954,16 +979,17 @@ def simulate_cmd( if simargs: om_cmd.args_set(args=simargs) - overrideFile = self._tempdir / f"{self._model_name}_override.txt" if self._override_variables or self._simulate_options_override: - tmpdict = self._override_variables.copy() - tmpdict.update(self._simulate_options_override) - # write to override file - with open(file=overrideFile, mode="w", encoding="utf-8") as fh: - for key, value in tmpdict.items(): - fh.write(f"{key}={value}\n") + override_file = result_file.parent / f"{result_file.stem}_override.txt" - om_cmd.arg_set(key="overrideFile", val=overrideFile.as_posix()) + override_content = ( + "\n".join([f"{key}={value}" for key, value in self._override_variables.items()]) + + "\n".join([f"{key}={value}" for key, value in self._simulate_options_override.items()]) + + "\n" + ) + + override_file.write_text(override_content) + om_cmd.arg_set(key="overrideFile", val=override_file.as_posix()) if self._inputs: # if model has input quantities for key in self._inputs: @@ -1013,11 +1039,16 @@ def simulate( if resultfile is None: # default result file generated by OM - self._result_file = self._tempdir / f"{self._model_name}_res.mat" - elif os.path.exists(resultfile): - self._result_file = pathlib.Path(resultfile) + self._result_file = self.getWorkDirectory() / f"{self._model_name}_res.mat" + elif isinstance(resultfile, OMCPath): + self._result_file = resultfile else: - self._result_file = self._tempdir / resultfile + self._result_file = self._getconn.omcpath(resultfile) + if not self._result_file.is_absolute(): + self._result_file = self.getWorkDirectory() / resultfile + + if not isinstance(self._result_file, OMCPath): + raise ModelicaSystemError(f"Invalid result file path: {self._result_file} - must be an OMCPath object!") om_cmd = self.simulate_cmd( result_file=self._result_file, @@ -1036,7 +1067,7 @@ def simulate( # check for an empty (=> 0B) result file which indicates a crash of the model executable # see: https://github.com/OpenModelica/OMPython/issues/261 # https://github.com/OpenModelica/OpenModelica/issues/13829 - if self._result_file.stat().st_size == 0: + if self._result_file.size() == 0: self._result_file.unlink() raise ModelicaSystemError("Empty result file - this indicates a crash of the model executable!") @@ -1044,7 +1075,11 @@ def simulate( self._simulated = True - def getSolutions(self, varList: Optional[str | list[str]] = None, resultfile: Optional[str] = None) -> tuple[str] | np.ndarray: + def getSolutions( + self, + varList: Optional[str | list[str]] = None, + resultfile: Optional[str | os.PathLike] = None, + ) -> tuple[str] | np.ndarray: """Extract simulation results from a result data file. Args: @@ -1081,7 +1116,7 @@ def getSolutions(self, varList: Optional[str | list[str]] = None, resultfile: Op raise ModelicaSystemError("No result file found. Run simulate() first.") result_file = self._result_file else: - result_file = pathlib.Path(resultfile) + result_file = self._getconn.omcpath(resultfile) # check for result file exits if not result_file.is_file(): @@ -1373,7 +1408,7 @@ def setInputs( return True - def _createCSVData(self, csvfile: Optional[pathlib.Path] = None) -> pathlib.Path: + def _createCSVData(self, csvfile: Optional[OMCPath] = None) -> OMCPath: """ Create a csv file with inputs for the simulation/optimization of the model. If csvfile is provided as argument, this file is used; else a generic file name is created. @@ -1419,11 +1454,12 @@ def _createCSVData(self, csvfile: Optional[pathlib.Path] = None) -> pathlib.Path csv_rows.append(row) if csvfile is None: - csvfile = self._tempdir / f'{self._model_name}.csv' + csvfile = self.getWorkDirectory() / f'{self._model_name}.csv' + + # basic definition of a CSV file using csv_rows as input + csv_content = "\n".join([",".join(map(str, row)) for row in csv_rows]) + "\n" - with open(file=csvfile, mode="w", encoding="utf-8", newline="") as fh: - writer = csv.writer(fh) - writer.writerows(csv_rows) + csvfile.write_text(csv_content) return csvfile @@ -1534,24 +1570,28 @@ def linearize(self, lintime: Optional[float] = None, simflags: Optional[str] = N * `result = linearize(); A = result[0]` mostly just for backwards compatibility, because linearize() used to return `[A, B, C, D]`. """ - - if self._xml_file is None: + if len(self._quantities) == 0: + # if self._quantities has no content, the xml file was not parsed; see self._xmlparse() raise ModelicaSystemError( "Linearization cannot be performed as the model is not build, " "use ModelicaSystem() to build the model first" ) - om_cmd = ModelicaSystemCmd(runpath=self._tempdir, modelname=self._model_name, timeout=timeout) - - overrideLinearFile = self._tempdir / f'{self._model_name}_override_linear.txt' + om_cmd = ModelicaSystemCmd( + runpath=self.getWorkDirectory(), + modelname=self._model_name, + timeout=timeout, + ) - with open(file=overrideLinearFile, mode="w", encoding="utf-8") as fh: - for key, value in self._override_variables.items(): - fh.write(f"{key}={value}\n") - for key, value in self._linearization_options.items(): - fh.write(f"{key}={value}\n") + override_content = ( + "\n".join([f"{key}={value}" for key, value in self._override_variables.items()]) + + "\n".join([f"{key}={value}" for key, value in self._linearization_options.items()]) + + "\n" + ) + override_file = self.getWorkDirectory() / f'{self._model_name}_override_linear.txt' + override_file.write_text(override_content) - om_cmd.arg_set(key="overrideFile", val=overrideLinearFile.as_posix()) + om_cmd.arg_set(key="overrideFile", val=override_file.as_posix()) if self._inputs: for key in self._inputs: @@ -1573,19 +1613,17 @@ def linearize(self, lintime: Optional[float] = None, simflags: Optional[str] = N om_cmd.args_set(args=simargs) # the file create by the model executable which contains the matrix and linear inputs, outputs and states - linear_file = self._tempdir / "linearized_model.py" - + linear_file = self.getWorkDirectory() / "linearized_model.py" linear_file.unlink(missing_ok=True) returncode = om_cmd.run() if returncode != 0: raise ModelicaSystemError(f"Linearize failed with return code: {returncode}") + if not linear_file.is_file(): + raise ModelicaSystemError(f"Linearization failed: {linear_file} not found!") self._simulated = True - if not linear_file.exists(): - raise ModelicaSystemError(f"Linearization failed: {linear_file} not found!") - # extract data from the python file with the linearized model using the ast module - this allows to get the # needed information without executing the created code linear_data = {} diff --git a/OMPython/OMCSession.py b/OMPython/OMCSession.py index ac99dc05..d5badb4b 100644 --- a/OMPython/OMCSession.py +++ b/OMPython/OMCSession.py @@ -268,6 +268,191 @@ def getClassNames(self, className=None, recursive=False, qualified=False, sort=F return self._ask(question='getClassNames', opt=opt) +class OMCPathReal(pathlib.PurePosixPath): + """ + Implementation of a basic Path object which uses OMC as backend. The connection to OMC is provided via a + OMCSessionZMQ session object. + """ + + def __init__(self, *path, session: OMCSessionZMQ) -> None: + super().__init__(*path) + self._session = session + + def with_segments(self, *pathsegments): + """ + Create a new OMCPath object with the given path segments. + + The original definition of Path is overridden to ensure session is set. + """ + return type(self)(*pathsegments, session=self._session) + + def is_file(self, *, follow_symlinks=True) -> bool: + """ + Check if the path is a regular file. + """ + return self._session.sendExpression(f'regularFileExists("{self.as_posix()}")') + + def is_dir(self, *, follow_symlinks=True) -> bool: + """ + Check if the path is a directory. + """ + return self._session.sendExpression(f'directoryExists("{self.as_posix()}")') + + def read_text(self, encoding=None, errors=None, newline=None) -> str: + """ + Read the content of the file represented by this path as text. + + The additional arguments `encoding`, `errors` and `newline` are only defined for compatibility with Path() + definition. + """ + return self._session.sendExpression(f'readFile("{self.as_posix()}")') + + def write_text(self, data: str, encoding=None, errors=None, newline=None): + """ + Write text data to the file represented by this path. + + The additional arguments `encoding`, `errors`, and `newline` are only defined for compatibility with Path() + definitions. + """ + if not isinstance(data, str): + raise TypeError('data must be str, not %s' % + data.__class__.__name__) + + return self._session.sendExpression(f'writeFile("{self.as_posix()}", "{data}", false)') + + def mkdir(self, mode=0o777, parents=False, exist_ok=False): + """ + Create a directory at the path represented by this OMCPath object. + + The additional arguments `mode`, and `parents` are only defined for compatibility with Path() definitions. + """ + if self.is_dir() and not exist_ok: + raise FileExistsError(f"Directory {self.as_posix()} already exists!") + + return self._session.sendExpression(f'mkdir("{self.as_posix()}")') + + def cwd(self): + """ + Returns the current working directory as an OMCPath object. + """ + cwd_str = self._session.sendExpression('cd()') + return OMCPath(cwd_str, session=self._session) + + def unlink(self, missing_ok: bool = False) -> None: + """ + Unlink (delete) the file or directory represented by this path. + """ + res = self._session.sendExpression(f'deleteFile("{self.as_posix()}")') + if not res and not missing_ok: + raise FileNotFoundError(f"Cannot delete file {self.as_posix()} - it does not exists!") + + def resolve(self, strict: bool = False): + """ + Resolve the path to an absolute path. This is done based on available OMC functions. + """ + if strict and not (self.is_file() or self.is_dir()): + raise OMCSessionException(f"Path {self.as_posix()} does not exist!") + + if self.is_file(): + omcpath = self._omc_resolve(self.parent.as_posix()) / self.name + elif self.is_dir(): + omcpath = self._omc_resolve(self.as_posix()) + else: + raise OMCSessionException(f"Path {self.as_posix()} is neither a file nor a directory!") + + return omcpath + + def _omc_resolve(self, pathstr: str): + """ + Internal function to resolve the path of the OMCPath object using OMC functions *WITHOUT* changing the cwd + within OMC. + """ + expression = ('omcpath_cwd := cd(); ' + f'omcpath_check := cd("{pathstr}"); ' # check requested pathstring + 'cd(omcpath_cwd)') + + try: + result = self._session.sendExpression(command=expression, parsed=False) + result_parts = result.split('\n') + pathstr_resolved = result_parts[1] + pathstr_resolved = pathstr_resolved[1:-1] # remove quotes + + omcpath_resolved = self._session.omcpath(pathstr_resolved) + except OMCSessionException as ex: + raise OMCSessionException(f"OMCPath resolve failed for {pathstr}!") from ex + + if not omcpath_resolved.is_file() and not omcpath_resolved.is_dir(): + raise OMCSessionException(f"OMCPath resolve failed for {pathstr} - path does not exist!") + + return omcpath_resolved + + def absolute(self): + """ + Resolve the path to an absolute path. This is done by calling resolve() as it is the best we can do + using OMC functions. + """ + return self.resolve(strict=True) + + def exists(self, follow_symlinks=True) -> bool: + """ + Semi replacement for pathlib.Path.exists(). + """ + return self.is_file() or self.is_dir() + + def size(self) -> int: + """ + Get the size of the file in bytes - this is an extra function and the best we can do using OMC. + """ + if not self.is_file(): + raise OMCSessionException(f"Path {self.as_posix()} is not a file!") + + res = self._session.sendExpression(f'stat("{self.as_posix()}")') + if res[0]: + return int(res[1]) + + raise OMCSessionException(f"Error reading file size for path {self.as_posix()}!") + + +if sys.version_info < (3, 12): + + class OMCPathCompatibility(pathlib.Path): + """ + Compatibility class for OMCPath in Python < 3.12. This allows to run all code which uses OMCPath (mainly + ModelicaSystem) on these Python versions. There is one remaining limitation: only OMCProcessLocal will work as + OMCPathCompatibility is based on the standard pathlib.Path implementation. + """ + + # modified copy of pathlib.Path.__new__() definition + def __new__(cls, *args, **kwargs): + logger.warning("Python < 3.12 - using a version of class OMCPath " + "based on pathlib.Path for local usage only.") + + if cls is OMCPathCompatibility: + cls = OMCPathCompatibilityWindows if os.name == 'nt' else OMCPathCompatibilityPosix + self = cls._from_parts(args) + if not self._flavour.is_supported: + raise NotImplementedError("cannot instantiate %r on your system" + % (cls.__name__,)) + return self + + def size(self) -> int: + """ + Needed compatibility function to have the same interface as OMCPathReal + """ + return self.stat().st_size + + class OMCPathCompatibilityPosix(pathlib.PosixPath, OMCPathCompatibility): + pass + + class OMCPathCompatibilityWindows(pathlib.WindowsPath, OMCPathCompatibility): + pass + + OMCPath = OMCPathCompatibility + +else: + OMCPath = OMCPathReal + + class OMCSessionZMQ: def __init__( @@ -322,6 +507,52 @@ def __del__(self): self.omc_zmq = None + def omcpath(self, *path) -> OMCPath: + """ + Create an OMCPath object based on the given path segments and the current OMC session. + """ + + # fallback solution for Python < 3.12; a modified pathlib.Path object is used as OMCPath replacement + if sys.version_info < (3, 12): + if isinstance(self.omc_process, OMCProcessLocal): + # noinspection PyArgumentList + return OMCPath(*path) + else: + raise OMCSessionException("OMCPath is supported for Python < 3.12 only if OMCProcessLocal is used!") + else: + return OMCPath(*path, session=self) + + def omcpath_tempdir(self, tempdir_base: Optional[OMCPath] = None) -> OMCPath: + """ + Get a temporary directory using OMC. It is our own implementation as non-local usage relies on OMC to run all + filesystem related access. + """ + names = [str(uuid.uuid4()) for _ in range(100)] + + if tempdir_base is None: + # fallback solution for Python < 3.12; a modified pathlib.Path object is used as OMCPath replacement + if sys.version_info < (3, 12): + tempdir_str = tempfile.gettempdir() + else: + tempdir_str = self.sendExpression("getTempDirectoryPath()") + tempdir_base = self.omcpath(tempdir_str) + + tempdir: Optional[OMCPath] = None + for name in names: + # create a unique temporary directory name + tempdir = tempdir_base / name + + if tempdir.exists(): + continue + + tempdir.mkdir(parents=True, exist_ok=False) + break + + if tempdir is None or not tempdir.is_dir(): + raise OMCSessionException("Cannot create a temporary directory!") + + return tempdir + def execute(self, command: str): warnings.warn("This function is depreciated and will be removed in future versions; " "please use sendExpression() instead", DeprecationWarning, stacklevel=2) diff --git a/tests/test_FMIExport.py b/tests/test_FMIExport.py index f47b87ae..b8305b31 100644 --- a/tests/test_FMIExport.py +++ b/tests/test_FMIExport.py @@ -1,12 +1,13 @@ import OMPython import shutil import os +import pathlib def test_CauerLowPassAnalog(): mod = OMPython.ModelicaSystem(modelName="Modelica.Electrical.Analog.Examples.CauerLowPassAnalog", lmodel=["Modelica"]) - tmp = mod.getWorkDirectory() + tmp = pathlib.Path(mod.getWorkDirectory()) try: fmu = mod.convertMo2Fmu(fileNamePrefix="CauerLowPassAnalog") assert os.path.exists(fmu) @@ -16,7 +17,7 @@ def test_CauerLowPassAnalog(): def test_DrumBoiler(): mod = OMPython.ModelicaSystem(modelName="Modelica.Fluid.Examples.DrumBoiler.DrumBoiler", lmodel=["Modelica"]) - tmp = mod.getWorkDirectory() + tmp = pathlib.Path(mod.getWorkDirectory()) try: fmu = mod.convertMo2Fmu(fileNamePrefix="DrumBoiler") assert os.path.exists(fmu) diff --git a/tests/test_ModelicaSystem.py b/tests/test_ModelicaSystem.py index 8e9b8a8e..e782489e 100644 --- a/tests/test_ModelicaSystem.py +++ b/tests/test_ModelicaSystem.py @@ -105,7 +105,7 @@ def test_customBuildDirectory(tmp_path, model_firstorder): tmpdir = tmp_path / "tmpdir1" tmpdir.mkdir() m = OMPython.ModelicaSystem(filePath, "M", customBuildDirectory=tmpdir) - assert m.getWorkDirectory().resolve() == tmpdir.resolve() + assert pathlib.Path(m.getWorkDirectory()).resolve() == tmpdir.resolve() result_file = tmpdir / "a.mat" assert not result_file.exists() m.simulate(resultfile="a.mat") diff --git a/tests/test_OMCPath.py b/tests/test_OMCPath.py new file mode 100644 index 00000000..b8e937f3 --- /dev/null +++ b/tests/test_OMCPath.py @@ -0,0 +1,78 @@ +import sys +import OMPython +import pytest + +skip_on_windows = pytest.mark.skipif( + sys.platform.startswith("win"), + reason="OpenModelica Docker image is Linux-only; skipping on Windows.", +) + +skip_python_older_312 = pytest.mark.skipif( + sys.version_info < (3, 12), + reason="OMCPath(non-local) only working for Python >= 3.12.", +) + + +def test_OMCPath_OMCSessionZMQ(): + om = OMPython.OMCSessionZMQ() + + _run_OMCPath_checks(om) + + del om + + +def test_OMCPath_OMCProcessLocal(): + omp = OMPython.OMCProcessLocal() + om = OMPython.OMCSessionZMQ(omc_process=omp) + + _run_OMCPath_checks(om) + + del om + + +@skip_on_windows +@skip_python_older_312 +def test_OMCPath_OMCProcessDocker(): + omcp = OMPython.OMCProcessDocker(docker="openmodelica/openmodelica:v1.25.0-minimal") + om = OMPython.OMCSessionZMQ(omc_process=omcp) + assert om.sendExpression("getVersion()") == "OpenModelica 1.25.0" + + _run_OMCPath_checks(om) + + del omcp + del om + + +@pytest.mark.skip(reason="Not able to run WSL on github") +@skip_python_older_312 +def test_OMCPath_OMCProcessWSL(): + omcp = OMPython.OMCProcessWSL( + wsl_omc='omc', + wsl_user='omc', + timeout=30.0, + ) + om = OMPython.OMCSessionZMQ(omc_process=omcp) + + _run_OMCPath_checks(om) + + del omcp + del om + + +def _run_OMCPath_checks(om: OMPython.OMCSessionZMQ): + p1 = om.omcpath_tempdir() + p2 = p1 / 'test' + p2.mkdir() + assert p2.is_dir() + p3 = p2 / '..' / p2.name / 'test.txt' + assert p3.is_file() is False + assert p3.write_text('test') + assert p3.is_file() + assert p3.size() > 0 + p3 = p3.resolve().absolute() + assert str(p3) == str((p2 / 'test.txt').resolve().absolute()) + assert p3.read_text() == "test" + assert p3.is_file() + assert p3.parent.is_dir() + p3.unlink() + assert p3.is_file() is False diff --git a/tests/test_optimization.py b/tests/test_optimization.py index b4164397..908cfd62 100644 --- a/tests/test_optimization.py +++ b/tests/test_optimization.py @@ -47,7 +47,9 @@ def test_optimization_example(tmp_path): r = mod.optimize() # it is necessary to specify resultfile, otherwise it wouldn't find it. - time, f, v = mod.getSolutions(["time", "f", "v"], resultfile=r["resultFile"]) + resultfile_str = r["resultFile"] + resultfile_omcpath = mod._getconn.omcpath(resultfile_str) + time, f, v = mod.getSolutions(["time", "f", "v"], resultfile=resultfile_omcpath.as_posix()) assert np.isclose(f[0], 10) assert np.isclose(f[-1], -10)