Skip to content
86 changes: 51 additions & 35 deletions OMPython/ModelicaSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,15 @@
import numbers
import numpy as np
import os
import pathlib
import platform
import re
import subprocess
import tempfile
import textwrap
from typing import Optional, Any
import warnings
import xml.etree.ElementTree as ET

from OMPython.OMCSession import OMCSessionException, OMCSessionZMQ, OMCProcessLocal
from OMPython.OMCSession import OMCSessionException, OMCSessionZMQ, OMCProcessLocal, OMCPath

# define logger using the current module name as ID
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -114,8 +112,8 @@ def __getitem__(self, index: int):
class ModelicaSystemCmd:
"""A compiled model executable."""

def __init__(self, runpath: pathlib.Path, modelname: str, timeout: Optional[float] = None) -> None:
self._runpath = pathlib.Path(runpath).resolve().absolute()
def __init__(self, runpath: OMCPath, modelname: str, timeout: Optional[float] = None) -> None:
self._runpath = runpath
self._model_name = modelname
self._timeout = timeout

Expand Down Expand Up @@ -229,7 +227,7 @@ def args_set(
for arg in args:
self.arg_set(key=arg, val=args[arg])

def get_exe(self) -> pathlib.Path:
def get_exe(self) -> OMCPath:
"""Get the path to the compiled model executable."""
if platform.system() == "Windows":
path_exe = self._runpath / f"{self._model_name}.exe"
Expand Down Expand Up @@ -349,7 +347,7 @@ def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, Any] | n
class ModelicaSystem:
def __init__(
self,
fileName: Optional[str | os.PathLike | pathlib.Path] = None,
fileName: Optional[str | os.PathLike] = None,
modelName: Optional[str] = None,
lmodel: Optional[list[str | tuple[str, str]]] = None,
commandLineOptions: Optional[list[str]] = None,
Expand Down Expand Up @@ -446,15 +444,25 @@ def __init__(

self._lmodel = lmodel # may be needed if model is derived from other model
self._model_name = modelName # Model class name
self._file_name = pathlib.Path(fileName).resolve() if fileName is not None else None # Model file/package name
if fileName is not None:
file_name = self._getconn.omcpath(fileName).resolve()
else:
file_name = None
self._file_name: Optional[OMCPath] = file_name # Model file/package name
self._simulated = False # True if the model has already been simulated
self._result_file: Optional[pathlib.Path] = None # for storing result file
self._result_file: Optional[OMCPath] = None # for storing result file
self._variable_filter = variableFilter

if self._file_name is not None and not self._file_name.is_file(): # if file does not exist
raise IOError(f"{self._file_name} does not exist!")

self._work_dir: pathlib.Path = self.setWorkDirectory(customBuildDirectory)
# set default command Line Options for linearization as
# linearize() will use the simulation executable and runtime
# flag -l to perform linearization
self.setCommandLineOptions("--linearizationDumpLanguage=python")
self.setCommandLineOptions("--generateSymbolicLinearization")

self._work_dir: OMCPath = self.setWorkDirectory(customBuildDirectory)

if self._file_name is not None:
self._loadLibrary(lmodel=self._lmodel)
Expand All @@ -474,7 +482,7 @@ def setCommandLineOptions(self, commandLineOptions: str):
exp = f'setCommandLineOptions("{commandLineOptions}")'
self.sendExpression(exp)

def _loadFile(self, fileName: pathlib.Path):
def _loadFile(self, fileName: OMCPath):
# load file
self.sendExpression(f'loadFile("{fileName.as_posix()}")')

Expand Down Expand Up @@ -502,17 +510,17 @@ def _loadLibrary(self, lmodel: list):
'1)["Modelica"]\n'
'2)[("Modelica","3.2.3"), "PowerSystems"]\n')

def setWorkDirectory(self, customBuildDirectory: Optional[str | os.PathLike] = None) -> pathlib.Path:
def setWorkDirectory(self, customBuildDirectory: Optional[str | os.PathLike] = None) -> OMCPath:
"""
Define the work directory for the ModelicaSystem / OpenModelica session. The model is build within this
directory. If no directory is defined a unique temporary directory is created.
"""
if customBuildDirectory is not None:
workdir = pathlib.Path(customBuildDirectory).absolute()
workdir = self._getconn.omcpath(customBuildDirectory).absolute()
if not workdir.is_dir():
raise IOError(f"Provided work directory does not exists: {customBuildDirectory}!")
else:
workdir = pathlib.Path(tempfile.mkdtemp()).absolute()
workdir = self._getconn.omcpath_tempdir().absolute()
if not workdir.is_dir():
raise IOError(f"{workdir} could not be created")

Expand All @@ -525,7 +533,7 @@ def setWorkDirectory(self, customBuildDirectory: Optional[str | os.PathLike] = N
# ... and also return the defined path
return workdir

def getWorkDirectory(self) -> pathlib.Path:
def getWorkDirectory(self) -> OMCPath:
"""
Return the defined working directory for this ModelicaSystem / OpenModelica session.
"""
Expand All @@ -546,7 +554,7 @@ def buildModel(self, variableFilter: Optional[str] = None):
buildModelResult = self._requestApi(apiName="buildModel", entity=self._model_name, properties=var_filter)
logger.debug("OM model build result: %s", buildModelResult)

xml_file = pathlib.Path(buildModelResult[0]).parent / buildModelResult[1]
xml_file = self._getconn.omcpath(buildModelResult[0]).parent / buildModelResult[1]
self._xmlparse(xml_file=xml_file)

def sendExpression(self, expr: str, parsed: bool = True) -> Any:
Expand Down Expand Up @@ -578,7 +586,7 @@ def _requestApi(

return self.sendExpression(exp)

def _xmlparse(self, xml_file: pathlib.Path):
def _xmlparse(self, xml_file: OMCPath):
if not xml_file.is_file():
raise ModelicaSystemError(f"XML file not generated: {xml_file}")

Expand Down Expand Up @@ -998,7 +1006,7 @@ def getOptimizationOptions(self, names: Optional[str | list[str]] = None) -> dic

def simulate_cmd(
self,
result_file: pathlib.Path,
result_file: OMCPath,
simflags: Optional[str] = None,
simargs: Optional[dict[str, Optional[str | dict[str, Any] | numbers.Number]]] = None,
timeout: Optional[float] = None,
Expand Down Expand Up @@ -1102,10 +1110,15 @@ def simulate(
if resultfile is None:
# default result file generated by OM
self._result_file = self.getWorkDirectory() / f"{self._model_name}_res.mat"
elif os.path.exists(resultfile):
self._result_file = pathlib.Path(resultfile)
elif isinstance(resultfile, OMCPath):
self._result_file = resultfile
else:
self._result_file = self.getWorkDirectory() / resultfile
self._result_file = self._getconn.omcpath(resultfile)
if not self._result_file.is_absolute():
self._result_file = self.getWorkDirectory() / resultfile

if not isinstance(self._result_file, OMCPath):
raise ModelicaSystemError(f"Invalid result file path: {self._result_file} - must be an OMCPath object!")

om_cmd = self.simulate_cmd(
result_file=self._result_file,
Expand All @@ -1124,15 +1137,19 @@ def simulate(
# check for an empty (=> 0B) result file which indicates a crash of the model executable
# see: https://github.com/OpenModelica/OMPython/issues/261
# https://github.com/OpenModelica/OpenModelica/issues/13829
if self._result_file.stat().st_size == 0:
if self._result_file.size() == 0:
self._result_file.unlink()
raise ModelicaSystemError("Empty result file - this indicates a crash of the model executable!")

logger.warning(f"Return code = {returncode} but result file exists!")

self._simulated = True

def getSolutions(self, varList: Optional[str | list[str]] = None, resultfile: Optional[str] = None) -> tuple[str] | np.ndarray:
def getSolutions(
self,
varList: Optional[str | list[str]] = None,
resultfile: Optional[str | os.PathLike] = None,
) -> tuple[str] | np.ndarray:
"""Extract simulation results from a result data file.

Args:
Expand Down Expand Up @@ -1169,7 +1186,7 @@ def getSolutions(self, varList: Optional[str | list[str]] = None, resultfile: Op
raise ModelicaSystemError("No result file found. Run simulate() first.")
result_file = self._result_file
else:
result_file = pathlib.Path(resultfile)
result_file = self._getconn.omcpath(resultfile)

# check if the result file exits
if not result_file.is_file():
Expand Down Expand Up @@ -1461,7 +1478,7 @@ def setInputs(

return True

def _createCSVData(self, csvfile: Optional[pathlib.Path] = None) -> pathlib.Path:
def _createCSVData(self, csvfile: Optional[OMCPath] = None) -> OMCPath:
"""
Create a csv file with inputs for the simulation/optimization of the model. If csvfile is provided as argument,
this file is used; else a generic file name is created.
Expand Down Expand Up @@ -1628,7 +1645,6 @@ def linearize(
* `result = linearize(); A = result[0]` mostly just for backwards
compatibility, because linearize() used to return `[A, B, C, D]`.
"""

if len(self._quantities) == 0:
# if self._quantities has no content, the xml file was not parsed; see self._xmlparse()
raise ModelicaSystemError(
Expand All @@ -1642,15 +1658,15 @@ def linearize(
timeout=timeout,
)

overrideLinearFile = self.getWorkDirectory() / f'{self._model_name}_override_linear.txt'

with open(file=overrideLinearFile, mode="w", encoding="utf-8") as fh:
for key1, value1 in self._override_variables.items():
fh.write(f"{key1}={value1}\n")
for key2, value2 in self._linearization_options.items():
fh.write(f"{key2}={value2}\n")
override_content = (
"\n".join([f"{key}={value}" for key, value in self._override_variables.items()])
+ "\n".join([f"{key}={value}" for key, value in self._linearization_options.items()])
+ "\n"
)
override_file = self.getWorkDirectory() / f'{self._model_name}_override_linear.txt'
override_file.write_text(override_content)

om_cmd.arg_set(key="overrideFile", val=overrideLinearFile.as_posix())
om_cmd.arg_set(key="overrideFile", val=override_file.as_posix())

if self._inputs:
for key in self._inputs:
Expand Down Expand Up @@ -1678,7 +1694,7 @@ def linearize(
returncode = om_cmd.run()
if returncode != 0:
raise ModelicaSystemError(f"Linearize failed with return code: {returncode}")
if not linear_file.exists():
if not linear_file.is_file():
raise ModelicaSystemError(f"Linearization failed: {linear_file} not found!")

self._simulated = True
Expand Down
5 changes: 3 additions & 2 deletions tests/test_FMIExport.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import OMPython
import shutil
import os
import pathlib


def test_CauerLowPassAnalog():
mod = OMPython.ModelicaSystem(modelName="Modelica.Electrical.Analog.Examples.CauerLowPassAnalog",
lmodel=["Modelica"])
tmp = mod.getWorkDirectory()
tmp = pathlib.Path(mod.getWorkDirectory())
try:
fmu = mod.convertMo2Fmu(fileNamePrefix="CauerLowPassAnalog")
assert os.path.exists(fmu)
Expand All @@ -16,7 +17,7 @@ def test_CauerLowPassAnalog():

def test_DrumBoiler():
mod = OMPython.ModelicaSystem(modelName="Modelica.Fluid.Examples.DrumBoiler.DrumBoiler", lmodel=["Modelica"])
tmp = mod.getWorkDirectory()
tmp = pathlib.Path(mod.getWorkDirectory())
try:
fmu = mod.convertMo2Fmu(fileNamePrefix="DrumBoiler")
assert os.path.exists(fmu)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_ModelicaSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def test_customBuildDirectory(tmp_path, model_firstorder):
tmpdir = tmp_path / "tmpdir1"
tmpdir.mkdir()
m = OMPython.ModelicaSystem(filePath, "M", customBuildDirectory=tmpdir)
assert m.getWorkDirectory().resolve() == tmpdir.resolve()
assert pathlib.Path(m.getWorkDirectory()).resolve() == tmpdir.resolve()
result_file = tmpdir / "a.mat"
assert not result_file.exists()
m.simulate(resultfile="a.mat")
Expand Down
4 changes: 3 additions & 1 deletion tests/test_optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def test_optimization_example(tmp_path):

r = mod.optimize()
# it is necessary to specify resultfile, otherwise it wouldn't find it.
time, f, v = mod.getSolutions(["time", "f", "v"], resultfile=r["resultFile"])
resultfile_str = r["resultFile"]
resultfile_omcpath = mod._getconn.omcpath(resultfile_str)
time, f, v = mod.getSolutions(["time", "f", "v"], resultfile=resultfile_omcpath.as_posix())
assert np.isclose(f[0], 10)
assert np.isclose(f[-1], -10)

Expand Down