diff --git a/OMPython/OMCSession.py b/OMPython/OMCSession.py index 593fa910d..7b8d0595e 100644 --- a/OMPython/OMCSession.py +++ b/OMPython/OMCSession.py @@ -35,6 +35,7 @@ """ import getpass +import io import json import logging import os @@ -48,7 +49,7 @@ import sys import tempfile import time -from typing import Any, Optional +from typing import Any, Optional, Tuple import uuid import warnings import zmq @@ -109,7 +110,7 @@ def _ask(self, question: str, opt: Optional[list[str]] = None, parsed: bool = Tr try: res = self._session.sendExpression(expression, parsed=parsed) except OMCSessionException as ex: - raise OMCSessionException("OMC _ask() failed: %s (parsed=%s)", expression, parsed) from ex + raise OMCSessionException("OMC _ask() failed: %s (parsed=%s)", (expression, parsed)) from ex # save response self._omc_cache[p] = res @@ -270,284 +271,66 @@ def getClassNames(self, className=None, recursive=False, qualified=False, sort=F class OMCSessionZMQ: - def __init__(self, - timeout: float = 10.00, - docker: Optional[str] = None, - dockerContainer: Optional[int] = None, - dockerExtraArgs: Optional[list] = None, - dockerOpenModelicaPath: str = "omc", - dockerNetwork: Optional[str] = None, - port: Optional[int] = None, - omhome: Optional[str] = None): - if dockerExtraArgs is None: - dockerExtraArgs = [] - - self._omhome = self._get_omhome(omhome=omhome) - - self._omc_process = None - self._omc_command = None - self._omc: Optional[Any] = None - self._dockerCid: Optional[int] = None - self._serverIPAddress = "127.0.0.1" - self._interactivePort = None - self._temp_dir = pathlib.Path(tempfile.gettempdir()) - # generate a random string for this session - self._random_string = uuid.uuid4().hex - try: - self._currentUser = getpass.getuser() - if not self._currentUser: - self._currentUser = "nobody" - except KeyError: - # We are running as a uid not existing in the password database... Pretend we are nobody - self._currentUser = "nobody" - - self._docker = docker - self._dockerContainer = dockerContainer - self._dockerExtraArgs = dockerExtraArgs - self._dockerOpenModelicaPath = dockerOpenModelicaPath - self._dockerNetwork = dockerNetwork - self._omc_log_file = self._create_omc_log_file("port") - self._timeout = timeout - # Locating and using the IOR - if sys.platform != 'win32' or docker or dockerContainer: - port_file = "openmodelica." + self._currentUser + ".port." + self._random_string - else: - port_file = "openmodelica.port." + self._random_string - self._port_file = ((pathlib.Path("/tmp") if docker else self._temp_dir) / port_file).as_posix() - self._interactivePort = port - # set omc executable path and args - self._omc_command = self._set_omc_command(omc_path_and_args_list=["--interactive=zmq", - "--locale=C", - f"-z={self._random_string}"]) - # start up omc executable, which is waiting for the ZMQ connection - self._omc_process = self._start_omc_process(timeout) - # connect to the running omc instance using ZMQ - self._omc_port = self._connect_to_omc(timeout) - - self._re_log_entries = None - self._re_log_raw = None - - def __del__(self): - try: - self.sendExpression("quit()") - except OMCSessionException: - pass - self._omc_log_file.close() - try: - self._omc_process.wait(timeout=2.0) - except subprocess.TimeoutExpired: - if self._omc_process: - logger.warning("OMC did not exit after being sent the quit() command; " - "killing the process with pid=%s", self._omc_process.pid) - self._omc_process.kill() - self._omc_process.wait() - - def _create_omc_log_file(self, suffix): # output? - if sys.platform == 'win32': - log_filename = f"openmodelica.{suffix}.{self._random_string}.log" - else: - log_filename = f"openmodelica.{self._currentUser}.{suffix}.{self._random_string}.log" - # this file must be closed in the destructor - omc_log_file = open(self._temp_dir / log_filename, "w+") - - return omc_log_file - - def _start_omc_process(self, timeout): # output? - if sys.platform == 'win32': - omhome_bin = (self._omhome / "bin").as_posix() - my_env = os.environ.copy() - my_env["PATH"] = omhome_bin + os.pathsep + my_env["PATH"] - omc_process = subprocess.Popen(self._omc_command, stdout=self._omc_log_file, - stderr=self._omc_log_file, env=my_env) - else: - # set the user environment variable so omc running from wsgi has the same user as OMPython - my_env = os.environ.copy() - my_env["USER"] = self._currentUser - omc_process = subprocess.Popen(self._omc_command, stdout=self._omc_log_file, - stderr=self._omc_log_file, env=my_env) - if self._docker: - for i in range(0, 40): - try: - with open(self._dockerCidFile, "r") as fin: - self._dockerCid = fin.read().strip() - except IOError: - pass - if self._dockerCid: - break - time.sleep(timeout / 40.0) - try: - os.remove(self._dockerCidFile) - except FileNotFoundError: - pass - if self._dockerCid is None: - logger.error("Docker did not start. Log-file says:\n%s" % (open(self._omc_log_file.name).read())) - raise OMCSessionException("Docker did not start (timeout=%f might be too short especially if you did " - "not docker pull the image before this command)." % timeout) - - dockerTop = None - if self._docker or self._dockerContainer: - if self._dockerNetwork == "separate": - output = subprocess.check_output(["docker", "inspect", self._dockerCid]).decode().strip() - self._serverIPAddress = json.loads(output)[0]["NetworkSettings"]["IPAddress"] - for i in range(0, 40): - if sys.platform == 'win32': - break - dockerTop = subprocess.check_output(["docker", "top", self._dockerCid]).decode().strip() - omc_process = None - for line in dockerTop.split("\n"): - columns = line.split() - if self._random_string in line: - try: - omc_process = DummyPopen(int(columns[1])) - except psutil.NoSuchProcess: - raise OMCSessionException( - f"Could not find PID {dockerTop} - is this a docker instance spawned " - f"without --pid=host?\nLog-file says:\n{open(self._omc_log_file.name).read()}") - break - if omc_process is not None: - break - time.sleep(timeout / 40.0) - if omc_process is None: - raise OMCSessionException("Docker top did not contain omc process %s:\n%s\nLog-file says:\n%s" - % (self._random_string, dockerTop, open(self._omc_log_file.name).read())) - return omc_process - - def _getuid(self): + def __init__( + self, + timeout: float = 10.00, + omhome: Optional[str] = None, + omc_process: Optional[OMCProcess] = None, + ) -> None: """ - The uid to give to docker. - On Windows, volumes are mapped with all files are chmod ugo+rwx, - so uid does not matter as long as it is not the root user. - """ - return 1000 if sys.platform == 'win32' else os.getuid() + Initialisation for OMCSessionZMQ - def _set_omc_command(self, omc_path_and_args_list) -> list: - """Define the command that will be called by the subprocess module. - - On Windows, use the list input style of the subprocess module to - avoid problems resulting from spaces in the path string. - Linux, however, only works with the string version. + Parameters + ---------- + timeout + omhome + omc_process """ - if (self._docker or self._dockerContainer) and sys.platform == "win32": - extraFlags = ["-d=zmqDangerousAcceptConnectionsFromAnywhere"] - if not self._interactivePort: - raise OMCSessionException("docker on Windows requires knowing which port to connect to. For " - "dockerContainer=..., the container needs to have already manually exposed " - "this port when it was started (-p 127.0.0.1:n:n) or you get an error later.") - else: - extraFlags = [] - if self._docker: - if sys.platform == "win32": - p = int(self._interactivePort) - dockerNetworkStr = ["-p", "127.0.0.1:%d:%d" % (p, p)] - elif self._dockerNetwork == "host" or self._dockerNetwork is None: - dockerNetworkStr = ["--network=host"] - elif self._dockerNetwork == "separate": - dockerNetworkStr = [] - extraFlags = ["-d=zmqDangerousAcceptConnectionsFromAnywhere"] - else: - raise OMCSessionException('dockerNetwork was set to %s, but only \"host\" or \"separate\" is allowed') - self._dockerCidFile = self._omc_log_file.name + ".docker.cid" - omcCommand = (["docker", "run", - "--cidfile", self._dockerCidFile, - "--rm", - "--env", "USER=%s" % self._currentUser, - "--user", str(self._getuid())] - + self._dockerExtraArgs - + dockerNetworkStr - + [self._docker, self._dockerOpenModelicaPath]) - elif self._dockerContainer: - omcCommand = (["docker", "exec", - "--env", "USER=%s" % self._currentUser, - "--user", str(self._getuid())] - + self._dockerExtraArgs - + [self._dockerContainer, self._dockerOpenModelicaPath]) - self._dockerCid = self._dockerContainer - else: - omcCommand = [str(self._get_omc_path())] - if self._interactivePort: - extraFlags = extraFlags + ["--interactivePort=%d" % int(self._interactivePort)] - - omc_command = omcCommand + omc_path_and_args_list + extraFlags - return omc_command - - def _get_omhome(self, omhome: Optional[str] = None): - # use the provided path - if omhome is not None: - return pathlib.Path(omhome) + self._timeout = timeout - # check the environment variable - omhome = os.environ.get('OPENMODELICAHOME') - if omhome is not None: - return pathlib.Path(omhome) + if omc_process is None: + omc_process = OMCProcessLocal(omhome=omhome, timeout=timeout) + elif not isinstance(omc_process, OMCProcess): + raise OMCSessionException("Invalid definition of the OMC process!") + self.omc_process = omc_process - # Get the path to the OMC executable, if not installed this will be None - path_to_omc = shutil.which("omc") - if path_to_omc is not None: - return pathlib.Path(path_to_omc).parents[1] + port = self.omc_process.get_port() + if not isinstance(port, str): + raise OMCSessionException(f"Invalid content for port: {port}") - raise OMCSessionException("Cannot find OpenModelica executable, please install from openmodelica.org") + # Create the ZeroMQ socket and connect to OMC server + context = zmq.Context.instance() + omc = context.socket(zmq.REQ) + omc.setsockopt(zmq.LINGER, 0) # Dismisses pending messages if closed + omc.setsockopt(zmq.IMMEDIATE, True) # Queue messages only to completed connections + omc.connect(port) - def _get_omc_path(self) -> pathlib.Path: - return self._omhome / "bin" / "omc" + self.omc_zmq: Optional[zmq.Socket[bytes]] = omc - def _connect_to_omc(self, timeout) -> str: - omc_zeromq_uri = "file:///" + self._port_file - # See if the omc server is running - attempts = 0 - port = None - while True: - if self._dockerCid: - try: - port = subprocess.check_output(args=["docker", - "exec", str(self._dockerCid), - "cat", str(self._port_file)], - stderr=subprocess.DEVNULL).decode().strip() - break - except subprocess.CalledProcessError: - pass - else: - if os.path.isfile(self._port_file): - # Read the port file - with open(self._port_file, 'r') as f_p: - port = f_p.readline() - os.remove(self._port_file) - break + # variables to store compiled re expressions use in self.sendExpression() + self._re_log_entries: Optional[re.Pattern[str]] = None + self._re_log_raw: Optional[re.Pattern[str]] = None - attempts += 1 - if attempts == 80.0: - name = self._omc_log_file.name - self._omc_log_file.close() - logger.error("OMC Server did not start. Please start it! Log-file says:\n%s" % open(name).read()) - raise OMCSessionException(f"OMC Server did not start (timeout={timeout}). " - f"Could not open file {self._port_file}") - time.sleep(timeout / 80.0) - - port = port.replace("0.0.0.0", self._serverIPAddress) - logger.info(f"OMC Server is up and running at {omc_zeromq_uri} " - f"pid={self._omc_process.pid if self._omc_process else '?'} cid={self._dockerCid}") + def __del__(self): + if isinstance(self.omc_zmq, zmq.Socket): + try: + self.sendExpression("quit()") + except OMCSessionException: + pass - # Create the ZeroMQ socket and connect to OMC server - context = zmq.Context.instance() - self._omc = context.socket(zmq.REQ) - self._omc.setsockopt(zmq.LINGER, 0) # Dismisses pending messages if closed - self._omc.setsockopt(zmq.IMMEDIATE, True) # Queue messages only to completed connections - self._omc.connect(port) + del self.omc_zmq - return port + self.omc_zmq = None - def execute(self, command): + def execute(self, command: str): warnings.warn("This function is depreciated and will be removed in future versions; " - "please use sendExpression() instead", DeprecationWarning, stacklevel=1) + "please use sendExpression() instead", DeprecationWarning, stacklevel=2) return self.sendExpression(command, parsed=False) - def sendExpression(self, command, parsed=True): - p = self._omc_process.poll() # check if process is running - if p is not None: - raise OMCSessionException("Process Exited, No connection with OMC. Create a new instance of OMCSessionZMQ!") - - if self._omc is None: + def sendExpression(self, command: str, parsed: bool = True) -> Any: + if self.omc_zmq is None: raise OMCSessionException("No OMC running. Create a new instance of OMCSessionZMQ!") logger.debug("sendExpression(%r, parsed=%r)", command, parsed) @@ -555,23 +338,21 @@ def sendExpression(self, command, parsed=True): attempts = 0 while True: try: - self._omc.send_string(str(command), flags=zmq.NOBLOCK) + self.omc_zmq.send_string(str(command), flags=zmq.NOBLOCK) break except zmq.error.Again: pass attempts += 1 if attempts >= 50: - self._omc_log_file.seek(0) - log = self._omc_log_file.read() - self._omc_log_file.close() - raise OMCSessionException(f"No connection with OMC (timeout={self._timeout}). Log-file says: \n{log}") + raise OMCSessionException(f"No connection with OMC (timeout={self._timeout}). " + f"Log-file says: \n{self.omc_process.get_log()}") time.sleep(self._timeout / 50.0) if command == "quit()": - self._omc.close() - self._omc = None + self.omc_zmq.close() + self.omc_zmq = None return None - result = self._omc.recv_string() + result = self.omc_zmq.recv_string() if command == "getErrorString()": # no error handling if 'getErrorString()' is called @@ -586,8 +367,8 @@ def sendExpression(self, command, parsed=True): return result # always check for error - self._omc.send_string('getMessagesStringInternal()', flags=zmq.NOBLOCK) - error_raw = self._omc.recv_string() + self.omc_zmq.send_string('getMessagesStringInternal()', flags=zmq.NOBLOCK) + error_raw = self.omc_zmq.recv_string() # run error handling only if there is something to check if error_raw != "{}\n": if not self._re_log_entries: @@ -643,3 +424,536 @@ def sendExpression(self, command, parsed=True): return om_parser_basic(result) except (TypeError, UnboundLocalError) as ex: raise OMCSessionException("Cannot parse OMC result") from ex + + +class OMCProcess: + + def __init__( + self, + timeout: float = 10.00, + **kwargs, + ) -> None: + super().__init__(**kwargs) + + # store variables + self._timeout = timeout + + # omc process + self._omc_process: Optional[subprocess.Popen] = None + # omc ZMQ port to use + self._omc_port: Optional[str] = None + + # generate a random string for this session + self._random_string = uuid.uuid4().hex + + # get a user ID + try: + self._currentUser = getpass.getuser() + if not self._currentUser: + self._currentUser = "nobody" + except KeyError: + # We are running as a uid not existing in the password database... Pretend we are nobody + self._currentUser = "nobody" + + # omc port and log file + if sys.platform == 'win32': + self._omc_file_port = f"openmodelica.port.{self._random_string}" + else: + self._omc_file_port = f"openmodelica.{self._currentUser}.port.{self._random_string}" + + # get a temporary directory + self._temp_dir = pathlib.Path(tempfile.gettempdir()) + + # setup log file - this file must be closed in the destructor + logfile = self._temp_dir / (self._omc_file_port + '.log') + self._omc_loghandle: Optional[io.TextIOWrapper] = None + try: + self._omc_loghandle = open(file=logfile, mode="w+", encoding="utf-8") + except OSError as ex: + raise OMCSessionException(f"Cannot open log file {logfile}.") from ex + + def __del__(self): + if self._omc_loghandle is not None: + try: + self._omc_loghandle.close() + except (OSError, IOError): + pass + self._omc_loghandle = None + + if isinstance(self._omc_process, subprocess.Popen): + try: + self._omc_process.wait(timeout=2.0) + except subprocess.TimeoutExpired: + if self._omc_process: + logger.warning("OMC did not exit after being sent the quit() command; " + "killing the process with pid=%s", self._omc_process.pid) + self._omc_process.kill() + self._omc_process.wait() + finally: + self._omc_process = None + + def get_port(self) -> Optional[str]: + if not isinstance(self._omc_port, str): + raise OMCSessionException(f"Invalid port to connect to OMC process: {self._omc_port}") + return self._omc_port + + def get_log(self) -> str: + if self._omc_loghandle is None: + raise OMCSessionException("Log file not available!") + + self._omc_loghandle.seek(0) + log = self._omc_loghandle.read() + + return log + + +class OMCProcessPort(OMCProcess): + + def __init__( + self, + omc_port: str, + ) -> None: + super().__init__() + self._omc_port = omc_port + + +class OMCProcessLocal(OMCProcess): + + def __init__( + self, + timeout: float = 10.00, + omhome: Optional[str] = None, + ) -> None: + + super().__init__(timeout=timeout) + + # where to find OpenModelica + self._omhome = self._omc_home_get(omhome=omhome) + # start up omc executable, which is waiting for the ZMQ connection + self._omc_process = self._omc_process_get() + # connect to the running omc instance using ZMQ + self._omc_port = self._omc_port_get() + + @staticmethod + def _omc_home_get(omhome: Optional[str] = None) -> pathlib.Path: + # use the provided path + if omhome is not None: + return pathlib.Path(omhome) + + # check the environment variable + omhome = os.environ.get('OPENMODELICAHOME') + if omhome is not None: + return pathlib.Path(omhome) + + # Get the path to the OMC executable, if not installed this will be None + path_to_omc = shutil.which("omc") + if path_to_omc is not None: + return pathlib.Path(path_to_omc).parents[1] + + raise OMCSessionException("Cannot find OpenModelica executable, please install from openmodelica.org") + + def _omc_process_get(self) -> subprocess.Popen: + my_env = os.environ.copy() + my_env["PATH"] = (self._omhome / "bin").as_posix() + os.pathsep + my_env["PATH"] + + omc_command = [ + (self._omhome / "bin" / "omc").as_posix(), + "--locale=C", + "--interactive=zmq", + f"-z={self._random_string}"] + + omc_process = subprocess.Popen(omc_command, + stdout=self._omc_loghandle, + stderr=self._omc_loghandle, + env=my_env) + return omc_process + + def _omc_port_get(self) -> str: + port = None + + # See if the omc server is running + attempts = 0 + while True: + omc_file_port = self._temp_dir / self._omc_file_port + + if omc_file_port.is_file(): + # Read the port file + with open(file=omc_file_port, mode='r', encoding="utf-8") as f_p: + port = f_p.readline() + break + + if port is not None: + break + + attempts += 1 + if attempts == 80.0: + raise OMCSessionException(f"OMC Server did not start (timeout={self._timeout}). " + f"Could not open file {omc_file_port}. " + f"Log-file says:\n{self.get_log()}") + time.sleep(self._timeout / 80.0) + + logger.info(f"Local OMC Server is up and running at ZMQ port {port} " + f"pid={self._omc_process.pid if isinstance(self._omc_process, subprocess.Popen) else '?'}") + + return port + + +class OMCProcessDockerHelper: + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + + self._dockerExtraArgs: list = [] + self._dockerOpenModelicaPath: Optional[str] = None + self._dockerNetwork: Optional[str] = None + + self._interactivePort: Optional[int] = None + + self._dockerCid: Optional[str] = None + self._docker_process: Optional[DummyPopen] = None + + @staticmethod + def _omc_process_docker(dockerCid: str, random_string: str, timeout: float) -> Optional[DummyPopen]: + if sys.platform == 'win32': + raise NotImplementedError("Docker not supported on win32!") + + docker_process = None + for idx in range(0, 40): + dockerTop = subprocess.check_output(["docker", "top", dockerCid]).decode().strip() + docker_process = None + for line in dockerTop.split("\n"): + columns = line.split() + if random_string in line: + try: + docker_process = DummyPopen(int(columns[1])) + except psutil.NoSuchProcess as ex: + raise OMCSessionException(f"Could not find PID {dockerTop} - " + "is this a docker instance spawned without --pid=host?") from ex + + if docker_process is not None: + break + time.sleep(timeout / 40.0) + + return docker_process + + @staticmethod + def _getuid() -> int: + """ + The uid to give to docker. + On Windows, volumes are mapped with all files are chmod ugo+rwx, + so uid does not matter as long as it is not the root user. + """ + return 1000 if sys.platform == 'win32' else os.getuid() + + def get_server_address(self) -> Optional[str]: + if self._dockerNetwork == "separate" and isinstance(self._dockerCid, str): + output = subprocess.check_output(["docker", "inspect", self._dockerCid]).decode().strip() + return json.loads(output)[0]["NetworkSettings"]["IPAddress"] + + return None + + def get_docker_container_id(self) -> str: + if not isinstance(self._dockerCid, str): + raise OMCSessionException(f"Invalid docker container ID: {self._dockerCid}!") + + return self._dockerCid + + +class OMCProcessDocker(OMCProcessDockerHelper, OMCProcess): + + def __init__( + self, + timeout: float = 10.00, + docker: Optional[str] = None, + dockerExtraArgs: Optional[list] = None, + dockerOpenModelicaPath: str = "omc", + dockerNetwork: Optional[str] = None, + port: Optional[int] = None, + ) -> None: + + super().__init__(timeout=timeout) + + if docker is None: + raise OMCSessionException("Argument docker must be set!") + + self._docker = docker + + if dockerExtraArgs is None: + dockerExtraArgs = [] + + self._dockerExtraArgs = dockerExtraArgs + self._dockerOpenModelicaPath = dockerOpenModelicaPath + self._dockerNetwork = dockerNetwork + + self._interactivePort = port + + self._dockerCidFile: Optional[pathlib.Path] = None + + # start up omc executable in docker container waiting for the ZMQ connection + self._omc_process, self._docker_process = self._omc_docker_start() + # connect to the running omc instance using ZMQ + self._omc_port = self._omc_port_get() + + def __del__(self) -> None: + + super().__del__() + + if isinstance(self._docker_process, DummyPopen): + try: + self._docker_process.wait(timeout=2.0) + except subprocess.TimeoutExpired: + if self._docker_process: + logger.warning("OMC did not exit after being sent the quit() command; " + "killing the process with pid=%s", self._docker_process.pid) + self._docker_process.kill() + self._docker_process.wait(timeout=2.0) + finally: + self._docker_process = None + + def _omc_command_docker(self, omc_path_and_args_list) -> list: + """ + Define the command that will be called by the subprocess module. + """ + extraFlags = [] + + if sys.platform == "win32": + extraFlags = ["-d=zmqDangerousAcceptConnectionsFromAnywhere"] + if not self._interactivePort: + raise OMCSessionException("docker on Windows requires knowing which port to connect to. For " + "dockerContainer=..., the container needs to have already manually exposed " + "this port when it was started (-p 127.0.0.1:n:n) or you get an error later.") + + if sys.platform == "win32": + if isinstance(self._interactivePort, str): + port = int(self._interactivePort) + elif isinstance(self._interactivePort, int): + port = self._interactivePort + else: + raise OMCSessionException("Missing or invalid interactive port!") + dockerNetworkStr = ["-p", f"127.0.0.1:{port}:{port}"] + elif self._dockerNetwork == "host" or self._dockerNetwork is None: + dockerNetworkStr = ["--network=host"] + elif self._dockerNetwork == "separate": + dockerNetworkStr = [] + extraFlags = ["-d=zmqDangerousAcceptConnectionsFromAnywhere"] + else: + raise OMCSessionException(f'dockerNetwork was set to {self._dockerNetwork}, ' + 'but only \"host\" or \"separate\" is allowed') + + self._dockerCidFile = self._temp_dir / (self._omc_file_port + ".docker.cid") + + if isinstance(self._interactivePort, int): + extraFlags = extraFlags + [f"--interactivePort={int(self._interactivePort)}"] + + omc_command = (["docker", "run", + "--cidfile", self._dockerCidFile.as_posix(), + "--rm", + "--env", f"USER={self._currentUser}", + "--user", str(self._getuid())] + + self._dockerExtraArgs + + dockerNetworkStr + + [self._docker, self._dockerOpenModelicaPath] + + omc_path_and_args_list + + extraFlags) + + return omc_command + + def _omc_port_get(self) -> str: + omc_file_port = '/tmp/' + self._omc_file_port + port = None + + if not isinstance(self._dockerCid, str): + raise OMCSessionException(f"Invalid docker container ID: {self._dockerCid}") + + # See if the omc server is running + attempts = 0 + while True: + try: + output = subprocess.check_output(args=["docker", + "exec", self._dockerCid, + "cat", omc_file_port], + stderr=subprocess.DEVNULL) + port = output.decode().strip() + except subprocess.CalledProcessError: + pass + + if port is not None: + break + + attempts += 1 + if attempts == 80.0: + raise OMCSessionException(f"Docker based OMC Server did not start (timeout={self._timeout}). " + f"Could not open file {omc_file_port}. " + f"Log-file says:\n{self.get_log()}") + time.sleep(self._timeout / 80.0) + + logger.info(f"OMC Server is up and running at port {port} " + f"pid={self._omc_process.pid if isinstance(self._omc_process, subprocess.Popen) else '?'}") + + return port + + def _omc_docker_start(self) -> Tuple[subprocess.Popen, DummyPopen]: + my_env = os.environ.copy() + my_env["USER"] = self._currentUser + + omc_command = self._omc_command_docker(omc_path_and_args_list=["--locale=C", + "--interactive=zmq", + f"-z={self._random_string}"]) + + omc_process = subprocess.Popen(omc_command, + stdout=self._omc_loghandle, + stderr=self._omc_loghandle, + env=my_env) + + if not isinstance(self._dockerCidFile, pathlib.Path): + raise OMCSessionException(f"Invalid content for docker container ID file path: {self._dockerCidFile}") + + for idx in range(0, 40): + try: + with open(file=self._dockerCidFile, mode="r", encoding="utf-8") as fh: + content = fh.read().strip() + self._dockerCid = content + except IOError: + pass + if self._dockerCid: + break + time.sleep(self._timeout / 40.0) + + if self._dockerCid is None: + logger.error(f"Docker did not start. Log-file says:\n{self.get_log()}") + raise OMCSessionException(f"Docker did not start (timeout={self._timeout} might be too short " + "especially if you did not docker pull the image before this command).") + + docker_process = self._omc_process_docker(dockerCid=self._dockerCid, + random_string=self._random_string, + timeout=self._timeout) + if docker_process is None: + raise OMCSessionException(f"Docker top did not contain omc process {self._random_string}. " + f"Log-file says:\n{self.get_log()}") + + return omc_process, docker_process + + +class OMCProcessDockerContainer(OMCProcessDockerHelper, OMCProcess): + + def __init__( + self, + timeout: float = 10.00, + dockerContainer: Optional[str] = None, + dockerExtraArgs: Optional[list] = None, + dockerOpenModelicaPath: str = "omc", + dockerNetwork: Optional[str] = None, + port: Optional[int] = None, + ) -> None: + + super().__init__(timeout=timeout) + + if not isinstance(dockerContainer, str): + raise OMCSessionException("Argument dockerContainer must be set!") + + self._dockerCid = dockerContainer + + if dockerExtraArgs is None: + dockerExtraArgs = [] + + self._dockerExtraArgs = dockerExtraArgs + self._dockerOpenModelicaPath = dockerOpenModelicaPath + self._dockerNetwork = dockerNetwork + + self._interactivePort = port + + # start up omc executable in docker container waiting for the ZMQ connection + self._omc_process, self._docker_process = self._omc_docker_start() + # connect to the running omc instance using ZMQ + self._omc_port = self._omc_port_get() + + def __del__(self) -> None: + + super().__del__() + + # docker container ID was provided - do NOT kill the docker process! + self._docker_process = None + + def _omc_command_docker(self, omc_path_and_args_list) -> list: + """ + Define the command that will be called by the subprocess module. + """ + extraFlags: list[str] = [] + + if sys.platform == "win32": + extraFlags = ["-d=zmqDangerousAcceptConnectionsFromAnywhere"] + if not self._interactivePort: + raise OMCSessionException("docker on Windows requires knowing which port to connect to. For " + "dockerContainer=..., the container needs to have already manually exposed " + "this port when it was started (-p 127.0.0.1:n:n) or you get an error later.") + + if isinstance(self._interactivePort, int): + extraFlags = extraFlags + [f"--interactivePort={int(self._interactivePort)}"] + + omc_command = (["docker", "exec", + "--env", f"USER={self._currentUser}", + "--user", str(self._getuid())] + + self._dockerExtraArgs + + [self._dockerCid, self._dockerOpenModelicaPath] + + omc_path_and_args_list + + extraFlags) + + return omc_command + + def _omc_port_get(self) -> str: + omc_file_port = '/tmp/' + self._omc_file_port + port = None + + if not isinstance(self._dockerCid, str): + raise OMCSessionException(f"Invalid docker container ID: {self._dockerCid}") + + # See if the omc server is running + attempts = 0 + while True: + try: + output = subprocess.check_output(args=["docker", + "exec", self._dockerCid, + "cat", omc_file_port], + stderr=subprocess.DEVNULL) + port = output.decode().strip() + except subprocess.CalledProcessError: + pass + + if port is not None: + break + + attempts += 1 + if attempts == 80.0: + raise OMCSessionException(f"Docker container based OMC Server did not start (timeout={self._timeout}). " + f"Could not open file {omc_file_port}. " + f"Log-file says:\n{self.get_log()}") + time.sleep(self._timeout / 80.0) + + logger.info(f"DockerContainer based OMC Server is up and running at port {port}") + + return port + + def _omc_docker_start(self) -> Tuple[subprocess.Popen, DummyPopen]: + my_env = os.environ.copy() + my_env["USER"] = self._currentUser + + omc_command = self._omc_command_docker(omc_path_and_args_list=["--locale=C", + "--interactive=zmq", + f"-z={self._random_string}"]) + + omc_process = subprocess.Popen(omc_command, + stdout=self._omc_loghandle, + stderr=self._omc_loghandle, + env=my_env) + + docker_process = None + if isinstance(self._dockerCid, str): + docker_process = self._omc_process_docker(dockerCid=self._dockerCid, + random_string=self._random_string, + timeout=self._timeout) + + if docker_process is None: + raise OMCSessionException(f"Docker top did not contain omc process {self._random_string} " + f"/ {self._dockerCid}. Log-file says:\n{self.get_log()}") + + return omc_process, docker_process diff --git a/OMPython/__init__.py b/OMPython/__init__.py index ccb067def..93fcdaa2d 100644 --- a/OMPython/__init__.py +++ b/OMPython/__init__.py @@ -37,7 +37,8 @@ """ from OMPython.ModelicaSystem import LinearizationResult, ModelicaSystem, ModelicaSystemCmd, ModelicaSystemError -from OMPython.OMCSession import OMCSessionCmd, OMCSessionException, OMCSessionZMQ +from OMPython.OMCSession import (OMCSessionCmd, OMCSessionException, OMCSessionZMQ, + OMCProcessPort, OMCProcessLocal, OMCProcessDocker, OMCProcessDockerContainer) # global names imported if import 'from OMPython import *' is used __all__ = [ @@ -49,4 +50,8 @@ 'OMCSessionCmd', 'OMCSessionException', 'OMCSessionZMQ', + 'OMCProcessPort', + 'OMCProcessLocal', + 'OMCProcessDocker', + 'OMCProcessDockerContainer', ] diff --git a/tests/test_ZMQ.py b/tests/test_ZMQ.py index e268a6406..30bf78e7b 100644 --- a/tests/test_ZMQ.py +++ b/tests/test_ZMQ.py @@ -41,3 +41,30 @@ def test_execute(om): assert om.execute('"HelloWorld!"') == '"HelloWorld!"\n' assert om.sendExpression('"HelloWorld!"', parsed=False) == '"HelloWorld!"\n' assert om.sendExpression('"HelloWorld!"', parsed=True) == 'HelloWorld!' + + +def test_omcprocessport_execute(om): + port = om.omc_process.get_port() + omcp = OMPython.OMCProcessPort(omc_port=port) + + # run 1 + om1 = OMPython.OMCSessionZMQ(omc_process=omcp) + assert om1.sendExpression('"HelloWorld!"', parsed=False) == '"HelloWorld!"\n' + + # run 2 + om2 = OMPython.OMCSessionZMQ(omc_process=omcp) + assert om2.sendExpression('"HelloWorld!"', parsed=False) == '"HelloWorld!"\n' + + del om1 + del om2 + + +def test_omcprocessport_simulate(om, model_time_str): + port = om.omc_process.get_port() + omcp = OMPython.OMCProcessPort(omc_port=port) + + om = OMPython.OMCSessionZMQ(omc_process=omcp) + assert om.sendExpression(f'loadString("{model_time_str}")') is True + om.sendExpression('res:=simulate(M, stopTime=2.0)') + assert om.sendExpression('res.resultFile') != "" + del om diff --git a/tests/test_docker.py b/tests/test_docker.py index 540d123aa..88687e071 100644 --- a/tests/test_docker.py +++ b/tests/test_docker.py @@ -4,12 +4,23 @@ @pytest.mark.skip(reason="This test would fail") def test_docker(): - om = OMPython.OMCSessionZMQ(docker="openmodelica/openmodelica:v1.16.1-minimal") + omcp = OMPython.OMCProcessDocker(docker="openmodelica/openmodelica:v1.16.1-minimal") + om = OMPython.OMCSessionZMQ(omc_process=omcp) assert om.sendExpression("getVersion()") == "OpenModelica 1.16.1" - omInner = OMPython.OMCSessionZMQ(dockerContainer=om._dockerCid) + + omcpInner = OMPython.OMCProcessDockerContainer(dockerContainer=omcp.get_docker_container_id()) + omInner = OMPython.OMCSessionZMQ(omc_process=omcpInner) assert omInner.sendExpression("getVersion()") == "OpenModelica 1.16.1" - om2 = OMPython.OMCSessionZMQ(docker="openmodelica/openmodelica:v1.16.1-minimal", port=11111) + + omcp2 = OMPython.OMCProcessDocker(docker="openmodelica/openmodelica:v1.16.1-minimal", port=11111) + om2 = OMPython.OMCSessionZMQ(omc_process=omcp2) assert om2.sendExpression("getVersion()") == "OpenModelica 1.16.1" + + del omcp2 del om2 + + del omcpInner del omInner + + del omcp del om