diff --git a/OMPython/__init__.py b/OMPython/__init__.py index 16f6af94c..7cde7d3a6 100644 --- a/OMPython/__init__.py +++ b/OMPython/__init__.py @@ -17,7 +17,6 @@ import platform import psutil import re -import shlex import signal import subprocess import sys @@ -137,17 +136,17 @@ def ask(self, question, opt=None, parsed=True): return self.omc_cache[p] if opt: - expression = '{0}({1})'.format(question, opt) + expression = f'{question}({opt})' else: expression = question - logger.debug('OMC ask: {0} - parsed: {1}'.format(expression, parsed)) + logger.debug('OMC ask: %s - parsed: %s', expression, parsed) try: res = self.sendExpression(expression, parsed=parsed) - except Exception as e: - logger.error("OMC failed: {0}, {1}, parsed={2}".format(question, opt, parsed)) - raise e + except Exception: + logger.error("OMC failed: %s, %s, parsed=%s", question, opt, parsed) + raise # save response self.omc_cache[p] = res @@ -156,7 +155,7 @@ def ask(self, question, opt=None, parsed=True): # TODO: Open Modelica Compiler API functions. Would be nice to generate these. def loadFile(self, filename): - return self.ask('loadFile', '"{0}"'.format(filename)) + return self.ask('loadFile', f'"{filename}"') def loadModel(self, className): return self.ask('loadModel', className) @@ -207,7 +206,7 @@ def getDerivedClassModifierNames(self, className): return self.ask('getDerivedClassModifierNames', className) def getDerivedClassModifierValue(self, className, modifierName): - return self.ask('getDerivedClassModifierValue', '{0}, {1}'.format(className, modifierName)) + return self.ask('getDerivedClassModifierValue', f'{className}, {modifierName}') def typeNameStrings(self, className): return self.ask('typeNameStrings', className) @@ -219,79 +218,79 @@ def getClassComment(self, className): try: return self.ask('getClassComment', className) except pyparsing.ParseException as ex: - logger.warning("Method 'getClassComment' failed for {0}".format(className)) - logger.warning('OMTypedParser error: {0}'.format(ex.message)) + logger.warning("Method 'getClassComment' failed for %s", className) + logger.warning('OMTypedParser error: %s', ex.message) return 'No description available' def getNthComponent(self, className, comp_id): """ returns with (type, name, description) """ - return self.ask('getNthComponent', '{0}, {1}'.format(className, comp_id)) + return self.ask('getNthComponent', f'{className}, {comp_id}') def getNthComponentAnnotation(self, className, comp_id): - return self.ask('getNthComponentAnnotation', '{0}, {1}'.format(className, comp_id)) + return self.ask('getNthComponentAnnotation', f'{className}, {comp_id}') def getImportCount(self, className): return self.ask('getImportCount', className) def getNthImport(self, className, importNumber): # [Path, id, kind] - return self.ask('getNthImport', '{0}, {1}'.format(className, importNumber)) + return self.ask('getNthImport', f'{className}, {importNumber}') def getInheritanceCount(self, className): return self.ask('getInheritanceCount', className) def getNthInheritedClass(self, className, inheritanceDepth): - return self.ask('getNthInheritedClass', '{0}, {1}'.format(className, inheritanceDepth)) + return self.ask('getNthInheritedClass', f'{className}, {inheritanceDepth}') def getParameterNames(self, className): try: return self.ask('getParameterNames', className) except KeyError as ex: - logger.warning('OMPython error: {0}'.format(ex)) + logger.warning('OMPython error: %s', ex) # FIXME: OMC returns with a different structure for empty parameter set return [] def getParameterValue(self, className, parameterName): try: - return self.ask('getParameterValue', '{0}, {1}'.format(className, parameterName)) + return self.ask('getParameterValue', f'{className}, {parameterName}') except pyparsing.ParseException as ex: - logger.warning('OMTypedParser error: {0}'.format(ex.message)) + logger.warning('OMTypedParser error: %s', ex.message) return "" def getComponentModifierNames(self, className, componentName): - return self.ask('getComponentModifierNames', '{0}, {1}'.format(className, componentName)) + return self.ask('getComponentModifierNames', f'{className}, {componentName}') def getComponentModifierValue(self, className, componentName): try: # FIXME: OMPython exception UnboundLocalError exception for 'Modelica.Fluid.Machines.ControlledPump' - return self.ask('getComponentModifierValue', '{0}, {1}'.format(className, componentName)) + return self.ask('getComponentModifierValue', f'{className}, {componentName}') except pyparsing.ParseException as ex: - logger.warning('OMTypedParser error: {0}'.format(ex.message)) - result = self.ask('getComponentModifierValue', '{0}, {1}'.format(className, componentName), parsed=False) + logger.warning('OMTypedParser error: %s', ex.message) + result = self.ask('getComponentModifierValue', f'{className}, {componentName}', parsed=False) try: answer = OMParser.check_for_values(result) OMParser.result = {} return answer[2:] except (TypeError, UnboundLocalError) as ex: - logger.warning('OMParser error: {0}'.format(ex)) + logger.warning('OMParser error: %s', ex) return result def getExtendsModifierNames(self, className, componentName): - return self.ask('getExtendsModifierNames', '{0}, {1}'.format(className, componentName)) + return self.ask('getExtendsModifierNames', f'{className}, {componentName}') def getExtendsModifierValue(self, className, extendsName, modifierName): try: # FIXME: OMPython exception UnboundLocalError exception for 'Modelica.Fluid.Machines.ControlledPump' - return self.ask('getExtendsModifierValue', '{0}, {1}, {2}'.format(className, extendsName, modifierName)) + return self.ask('getExtendsModifierValue', f'{className}, {extendsName}, {modifierName}') except pyparsing.ParseException as ex: - logger.warning('OMTypedParser error: {0}'.format(ex.message)) - result = self.ask('getExtendsModifierValue', '{0}, {1}, {2}'.format(className, extendsName, modifierName), parsed=False) + logger.warning('OMTypedParser error: %s', ex.message) + result = self.ask('getExtendsModifierValue', f'{className}, {extendsName}, {modifierName}', parsed=False) try: answer = OMParser.check_for_values(result) OMParser.result = {} return answer[2:] except (TypeError, UnboundLocalError) as ex: - logger.warning('OMParser error: {0}'.format(ex)) + logger.warning('OMParser error: %s', ex) return result def getNthComponentModification(self, className, comp_id): @@ -299,7 +298,7 @@ def getNthComponentModification(self, className, comp_id): # get {$Code(....)} field # \{\$Code\((\S*\s*)*\)\} - value = self.ask('getNthComponentModification', '{0}, {1}'.format(className, comp_id), parsed=False) + value = self.ask('getNthComponentModification', f'{className}, {comp_id}', parsed=False) value = value.replace("{$Code(", "") return value[:-3] # return self.re_Code.findall(value) @@ -315,16 +314,15 @@ def getNthComponentModification(self, className, comp_id): # end getClassNames; def getClassNames(self, className=None, recursive=False, qualified=False, sort=False, builtin=False, showProtected=False): - if className: - value = self.ask('getClassNames', - '{0}, recursive={1}, qualified={2}, sort={3}, builtin={4}, showProtected={5}'.format( - className, str(recursive).lower(), str(qualified).lower(), str(sort).lower(), - str(builtin).lower(), str(showProtected).lower())) - else: - value = self.ask('getClassNames', - 'recursive={0}, qualified={1}, sort={2}, builtin={3}, showProtected={4}'.format( - str(recursive).lower(), str(qualified).lower(), str(sort).lower(), - str(builtin).lower(), str(showProtected).lower())) + value = self.ask( + 'getClassNames', + (f'{className}, ' if className else '') + + f'recursive={str(recursive).lower()}, ' + f'qualified={str(qualified).lower()}, ' + f'sort={str(sort).lower()}, ' + f'builtin={str(builtin).lower()}, ' + f'showProtected={str(showProtected).lower()}' + ) return value @@ -378,7 +376,7 @@ def __init__(self, readonly=False, timeout=10.00, self._set_omc_command([ "--interactive=zmq", "--locale=C", - "-z={0}".format(self._random_string) + f"-z={self._random_string}" ]) # start up omc executable, which is waiting for the ZMQ connection self._start_omc_process(timeout) @@ -391,38 +389,25 @@ def __del__(self): except Exception: pass self._omc_log_file.close() - if sys.version_info.major >= 3: - try: - self._omc_process.wait(timeout=2.0) - except Exception: - if self._omc_process: - self._omc_process.kill() - else: - for i in range(0, 100): - time.sleep(0.02) - if self._omc_process and (self._omc_process.poll() is not None): - break - # kill self._omc_process process if it is still running/exists - if self._omc_process is not None and self._omc_process.returncode is None: - logger.warning("OMC did not exit after being sent the quit() command; killing the process with pid=%s" % str(self._omc_process.pid)) - if sys.platform == "win32": - self._omc_process.kill() - self._omc_process.wait() - else: - os.killpg(os.getpgid(self._omc_process.pid), signal.SIGTERM) + try: + self._omc_process.wait(timeout=2.0) + except Exception: + if self._omc_process: + print("OMC did not exit after being sent the quit() command; killing the process with pid={self._omc_process.pid}") self._omc_process.kill() self._omc_process.wait() def _create_omc_log_file(self, suffix): if sys.platform == 'win32': - self._omc_log_file = open(os.path.join(self._temp_dir, "openmodelica.{0}.{1}.log".format(suffix, self._random_string)), 'w') + log_filename = f"openmodelica.{suffix}.{self._random_string}.log" else: - # this file must be closed in the destructor - self._omc_log_file = open(os.path.join(self._temp_dir, "openmodelica.{0}.{1}.{2}.log".format(self._currentUser, suffix, self._random_string)), 'w') + log_filename = f"openmodelica.{self._currentUser}.{suffix}.{self._random_string}.log" + # this file must be closed in the destructor + self._omc_log_file = open(pathlib.Path(self._temp_dir) / log_filename, "w+") def _start_omc_process(self, timeout): if sys.platform == 'win32': - omhome_bin = os.path.join(self.omhome, 'bin').replace("\\", "/") + omhome_bin = (self.omhome / "bin").as_posix() my_env = os.environ.copy() my_env["PATH"] = omhome_bin + os.pathsep + my_env["PATH"] self._omc_process = subprocess.Popen(self._omc_command, stdout=self._omc_log_file, @@ -431,9 +416,8 @@ def _start_omc_process(self, timeout): # set the user environment variable so omc running from wsgi has the same user as OMPython my_env = os.environ.copy() my_env["USER"] = self._currentUser - # Because we spawned a shell, and we need to be able to kill OMC, create a new process group for this - self._omc_process = subprocess.Popen(self._omc_command, shell=True, stdout=self._omc_log_file, - stderr=self._omc_log_file, env=my_env, preexec_fn=os.setsid) + self._omc_process = subprocess.Popen(self._omc_command, stdout=self._omc_log_file, + stderr=self._omc_log_file, env=my_env) if self._docker: for i in range(0, 40): try: @@ -468,13 +452,14 @@ def _start_omc_process(self, timeout): self._omc_process = DummyPopen(int(columns[1])) except psutil.NoSuchProcess: raise Exception( - "Could not find PID %s - is this a docker instance spawned without --pid=host?\n" - "Log-file says:\n%s" % (self._random_string, open(self._omc_log_file.name).read())) + f"Could not find PID {dockerTop} - is this a docker instance spawned without --pid=host?\n" + f"Log-file says:\n{open(self._omc_log_file.name).read()}") break if self._omc_process is not None: break time.sleep(timeout / 40.0) if self._omc_process is None: + raise Exception("Docker top did not contain omc process %s:\n%s\nLog-file says:\n%s" % (self._random_string, dockerTop, open(self._omc_log_file.name).read())) return self._omc_process @@ -517,43 +502,33 @@ def _set_omc_command(self, omc_path_and_args_list): omcCommand = ["docker", "exec", "--env", "USER=%s" % self._currentUser, "--user", str(self._getuid())] + self._dockerExtraArgs + [self._dockerContainer, self._dockerOpenModelicaPath] self._dockerCid = self._dockerContainer else: - omcCommand = [self._get_omc_path()] + omcCommand = [str(self._get_omc_path())] if self._interactivePort: extraFlags = extraFlags + ["--interactivePort=%d" % int(self._interactivePort)] - omc_path_and_args_list = omcCommand + omc_path_and_args_list + extraFlags - - if sys.platform == 'win32': - self._omc_command = omc_path_and_args_list - else: - self._omc_command = ' '.join([shlex.quote(a) if (sys.version_info > (3, 0)) else a for a in omc_path_and_args_list]) + self._omc_command = omcCommand + omc_path_and_args_list + extraFlags return self._omc_command def _get_omhome(self, omhome: str = None): # use the provided path if omhome is not None: - return omhome + return pathlib.Path(omhome) # check the environment variable omhome = os.environ.get('OPENMODELICAHOME') if omhome is not None: - return omhome + return pathlib.Path(omhome) # Get the path to the OMC executable, if not installed this will be None path_to_omc = shutil.which("omc") if path_to_omc is not None: - return os.path.dirname(os.path.dirname(path_to_omc)) + return pathlib.Path(path_to_omc).parents[1] raise ValueError("Cannot find OpenModelica executable, please install from openmodelica.org") - def _get_omc_path(self): - try: - return os.path.join(self.omhome, 'bin', 'omc') - except BaseException: - logger.error("The OpenModelica compiler is missing in the System path (%s), please install it" - % os.path.join(self.omhome, 'bin', 'omc')) - raise + def _get_omc_path(self) -> pathlib.Path: + return self.omhome / "bin" / "omc" def _connect_to_omc(self, timeout): self._omc_zeromq_uri = "file:///" + self._port_file @@ -563,7 +538,7 @@ def _connect_to_omc(self, timeout): while True: if self._dockerCid: try: - self._port = subprocess.check_output(["docker", "exec", self._dockerCid, "cat", self._port_file], stderr=subprocess.DEVNULL if (sys.version_info > (3, 0)) else subprocess.STDOUT).decode().strip() + self._port = subprocess.check_output(["docker", "exec", self._dockerCid, "cat", self._port_file], stderr=subprocess.DEVNULL).decode().strip() break except Exception: pass @@ -580,11 +555,11 @@ def _connect_to_omc(self, timeout): name = self._omc_log_file.name self._omc_log_file.close() logger.error("OMC Server did not start. Please start it! Log-file says:\n%s" % open(name).read()) - raise Exception("OMC Server did not start (timeout=%f). Could not open file %s" % (timeout, self._port_file)) + raise Exception(f"OMC Server did not start (timeout={timeout}). Could not open file {self._port_file}") time.sleep(timeout / 80.0) self._port = self._port.replace("0.0.0.0", self._serverIPAddress) - logger.info("OMC Server is up and running at {0} pid={1} cid={2}".format(self._omc_zeromq_uri, self._omc_process.pid, self._dockerCid)) + logger.info(f"OMC Server is up and running at {self._omc_zeromq_uri} pid={self._omc_process.pid} cid={self._dockerCid}") # Create the ZeroMQ socket and connect to OMC server context = zmq.Context.instance() @@ -594,36 +569,36 @@ def _connect_to_omc(self, timeout): self._omc.connect(self._port) def sendExpression(self, command, parsed=True): - # check for process is running - p = self._omc_process.poll() - if p is None: - attempts = 0 - while True: - try: - self._omc.send_string(str(command), flags=zmq.NOBLOCK) - break - except zmq.error.Again: - pass - attempts += 1 - if attempts == 50.0: - name = self._omc_log_file.name - self._omc_log_file.close() - raise Exception("No connection with OMC (timeout=%f). Log-file says: \n%s" % (self._timeout, open(name).read())) - time.sleep(self._timeout / 50.0) - if command == "quit()": - self._omc.close() - self._omc = None - return None - else: - result = self._omc.recv_string() - if parsed is True: - answer = OMTypedParser.parseString(result) - return answer - else: - return result - else: + p = self._omc_process.poll() # check if process is running + if p is not None: raise Exception("Process Exited, No connection with OMC. Create a new instance of OMCSessionZMQ") + attempts = 0 + while True: + try: + self._omc.send_string(str(command), flags=zmq.NOBLOCK) + break + except zmq.error.Again: + pass + attempts += 1 + if attempts >= 50: + self._omc_log_file.seek(0) + log = self._omc_log_file.read() + self._omc_log_file.close() + raise Exception(f"No connection with OMC (timeout={self._timeout}). Log-file says: \n{log}") + time.sleep(self._timeout / 50.0) + if command == "quit()": + self._omc.close() + self._omc = None + return None + else: + result = self._omc.recv_string() + if parsed is True: + answer = OMTypedParser.parseString(result) + return answer + else: + return result + class ModelicaSystemError(Exception): pass @@ -644,7 +619,6 @@ def __init__(self, fileName=None, modelName=None, lmodel=None, commandLineOption """ if fileName is None and modelName is None and not lmodel: # all None raise Exception("Cannot create ModelicaSystem object without any arguments") - return self.tree = None self.quantitiesList = [] @@ -699,8 +673,8 @@ def __init__(self, fileName=None, modelName=None, lmodel=None, commandLineOption # set default command Line Options for linearization as # linearize() will use the simulation executable and runtime # flag -l to perform linearization - self.sendExpression("setCommandLineOptions(\"--linearizationDumpLanguage=python\")") - self.sendExpression("setCommandLineOptions(\"--generateSymbolicLinearization\")") + self.setCommandLineOptions("--linearizationDumpLanguage=python") + self.setCommandLineOptions("--generateSymbolicLinearization") self.setTempDirectory(customBuildDirectory) @@ -716,11 +690,11 @@ def __init__(self, fileName=None, modelName=None, lmodel=None, commandLineOption def setCommandLineOptions(self, commandLineOptions: str): # set commandLineOptions if provided by users - if commandLineOptions is not None: - exp = "".join(["setCommandLineOptions(", "\"", commandLineOptions, "\"", ")"]) - cmdexp = self.sendExpression(exp) - if not cmdexp: - self._check_error() + if commandLineOptions is None: + return + exp = f'setCommandLineOptions("{commandLineOptions}")' + if not self.sendExpression(exp): + self._check_error() def loadFile(self): # load file @@ -742,16 +716,16 @@ def loadLibrary(self): result = self.requestApi(apiCall, element) elif isinstance(element, tuple): if not element[1]: - libname = "".join(["loadModel(", element[0], ")"]) + libname = f"loadModel({element[0]})" else: - libname = "".join(["loadModel(", element[0], ", ", "{", "\"", element[1], "\"", "}", ")"]) + libname = f'loadModel({element[0]}, {{"{element[1]}"}})' result = self.sendExpression(libname) else: - raise ModelicaSystemError("loadLibrary() failed, Unknown type detected: " + - "{} is of type {}, ".format(element, type(element)) + - "The following patterns are supported:\n" + - "1)[\"Modelica\"]\n" + - "2)[(\"Modelica\",\"3.2.3\"), \"PowerSystems\"]\n") + raise ModelicaSystemError("loadLibrary() failed, Unknown type detected: " + f"{element} is of type {type(element)}, " + "The following patterns are supported:\n" + '1)["Modelica"]\n' + '2)[("Modelica","3.2.3"), "PowerSystems"]\n') # Show notification or warnings to the user when verbose=True OR if some error occurred i.e., not result if self._verbose or not result: self._check_error() @@ -767,22 +741,22 @@ def setTempDirectory(self, customBuildDirectory): if not os.path.exists(self.tempdir): raise IOError(self.tempdir, " cannot be created") - logger.info("Define tempdir as {}".format(self.tempdir)) - exp = "".join(["cd(", "\"", self.tempdir, "\"", ")"]).replace("\\", "/") + logger.info("Define tempdir as %s", self.tempdir) + exp = f'cd("{pathlib.Path(self.tempdir).as_posix()}")' self.sendExpression(exp) def getWorkDirectory(self): return self.tempdir def _run_cmd(self, cmd: list): - logger.debug("Run OM command {} in {}".format(cmd, self.tempdir)) + logger.debug("Run OM command %s in %s", cmd, self.tempdir) if platform.system() == "Windows": dllPath = "" # set the process environment from the generated .bat file in windows which should have all the dependencies - batFilePath = os.path.join(self.tempdir, '{}.{}'.format(self.modelName, "bat")).replace("\\", "/") - if (not os.path.exists(batFilePath)): + batFilePath = pathlib.Path(self.tempdir) / f"{self.modelName}.bat" + if not batFilePath.exists(): ModelicaSystemError("Batch file (*.bat) does not exist " + batFilePath) with open(batFilePath, 'r') as file: @@ -796,35 +770,31 @@ def _run_cmd(self, cmd: list): # TODO: how to handle path to resources of external libraries for any system not Windows? my_env = None - currentDir = os.getcwd() try: - os.chdir(self.tempdir) - p = subprocess.Popen(cmd, env=my_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p = subprocess.Popen(cmd, env=my_env, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, cwd=self.tempdir) stdout, stderr = p.communicate() stdout = stdout.decode('ascii').strip() stderr = stderr.decode('ascii').strip() if stderr: - raise ModelicaSystemError("Error running command {}: {}".format(cmd, stderr)) + raise ModelicaSystemError(f"Error running command {cmd}: {stderr}") if self._verbose and stdout: - logger.info("OM output for command {}:\n{}".format(cmd, stdout)) + logger.info("OM output for command %s:\n%s", cmd, stdout) p.wait() p.terminate() - os.chdir(currentDir) except Exception as e: - os.chdir(currentDir) - raise ModelicaSystemError("Exception {} running command {}: {}".format(type(e), cmd, e)) + raise ModelicaSystemError(f"Exception {type(e)} running command {cmd}: {e}") def _check_error(self): errstr = self.sendExpression("getErrorString()") - if errstr is None or not errstr: + if not errstr: return - self._raise_error(errstr=errstr) def _raise_error(self, errstr: str): if self._raiseerrors: - raise ModelicaSystemError("OM error: {}".format(errstr)) + raise ModelicaSystemError(f"OM error: {errstr}") else: logger.error(errstr) @@ -833,17 +803,16 @@ def buildModel(self, variableFilter=None): self.variableFilter = variableFilter if self.variableFilter is not None: - varFilter = "variableFilter=" + "\"" + self.variableFilter + "\"" + varFilter = f'variableFilter="{self.variableFilter}"' else: - varFilter = "variableFilter=" + "\".*""\"" - logger.debug(varFilter) - # buildModelResult=self.sendExpression("buildModel("+ mName +")") + varFilter = 'variableFilter=".*"' + logger.debug("varFilter=%s", varFilter) buildModelResult = self.requestApi("buildModel", self.modelName, properties=varFilter) if self._verbose: - logger.info("OM model build result: {}".format(buildModelResult)) + logger.info("OM model build result: %s", buildModelResult) self._check_error() - self.xmlFile = os.path.join(os.path.dirname(buildModelResult[0]), buildModelResult[1]).replace("\\", "/") + self.xmlFile = pathlib.Path(buildModelResult[0]).parent / buildModelResult[1] self.xmlparse() def sendExpression(self, expr, parsed=True): @@ -852,76 +821,69 @@ def sendExpression(self, expr, parsed=True): # request to OMC def requestApi(self, apiName, entity=None, properties=None): # 2 - if (entity is not None and properties is not None): - exp = '{}({}, {})'.format(apiName, entity, properties) + if entity is not None and properties is not None: + exp = f'{apiName}({entity}, {properties})' elif entity is not None and properties is None: - if (apiName == "loadFile" or apiName == "importFMU"): - exp = '{}("{}")'.format(apiName, entity) + if apiName in ("loadFile", "importFMU"): + exp = f'{apiName}("{entity}")' else: - exp = '{}({})'.format(apiName, entity) + exp = f'{apiName}({entity})' else: - exp = '{}()'.format(apiName) + exp = f'{apiName}()' try: res = self.sendExpression(exp) except Exception as e: - errstr = "Exception {} raised: {}".format(type(e), e) - self._raise_error(errstr=errstr) + self._raise_error(errstr=f"Exception {type(e)} raised: {e}") res = None return res def xmlparse(self): - if (os.path.exists(self.xmlFile)): - self.tree = ET.parse(self.xmlFile) - self.root = self.tree.getroot() - rootCQ = self.root - for attr in rootCQ.iter('DefaultExperiment'): - self.simulateOptions["startTime"] = attr.get('startTime') - self.simulateOptions["stopTime"] = attr.get('stopTime') - self.simulateOptions["stepSize"] = attr.get('stepSize') - self.simulateOptions["tolerance"] = attr.get('tolerance') - self.simulateOptions["solver"] = attr.get('solver') - self.simulateOptions["outputFormat"] = attr.get('outputFormat') - - for sv in rootCQ.iter('ScalarVariable'): - scalar = {} - scalar["name"] = sv.get('name') - scalar["changeable"] = sv.get('isValueChangeable') - scalar["description"] = sv.get('description') - scalar["variability"] = sv.get('variability') - scalar["causality"] = sv.get('causality') - scalar["alias"] = sv.get('alias') - scalar["aliasvariable"] = sv.get('aliasVariable') - ch = list(sv) - start = None - min = None - max = None - unit = None - for att in ch: - start = att.get('start') - min = att.get('min') - max = att.get('max') - unit = att.get('unit') - scalar["start"] = start - scalar["min"] = min - scalar["max"] = max - scalar["unit"] = unit - - if (scalar["variability"] == "parameter"): - if scalar["name"] in self.overridevariables: - self.paramlist[scalar["name"]] = self.overridevariables[scalar["name"]] - else: - self.paramlist[scalar["name"]] = scalar["start"] - if (scalar["variability"] == "continuous"): - self.continuouslist[scalar["name"]] = scalar["start"] - if (scalar["causality"] == "input"): - self.inputlist[scalar["name"]] = scalar["start"] - if (scalar["causality"] == "output"): - self.outputlist[scalar["name"]] = scalar["start"] - - self.quantitiesList.append(scalar) - else: - errstr = "XML file not generated: " + self.xmlFile - self._raise_error(errstr=errstr) + if not self.xmlFile.exists(): + self._raise_error(errstr=f"XML file not generated: {self.xmlFile}") + return + + self.tree = ET.parse(self.xmlFile) + self.root = self.tree.getroot() + rootCQ = self.root + for attr in rootCQ.iter('DefaultExperiment'): + for key in ("startTime", "stopTime", "stepSize", "tolerance", + "solver", "outputFormat"): + self.simulateOptions[key] = attr.get(key) + + for sv in rootCQ.iter('ScalarVariable'): + scalar = {} + for key in ("name", "description", "variability", "causality", "alias"): + scalar[key] = sv.get(key) + scalar["changeable"] = sv.get('isValueChangeable') + scalar["aliasvariable"] = sv.get('aliasVariable') + ch = list(sv) + start = None + min = None + max = None + unit = None + for att in ch: + start = att.get('start') + min = att.get('min') + max = att.get('max') + unit = att.get('unit') + scalar["start"] = start + scalar["min"] = min + scalar["max"] = max + scalar["unit"] = unit + + if scalar["variability"] == "parameter": + if scalar["name"] in self.overridevariables: + self.paramlist[scalar["name"]] = self.overridevariables[scalar["name"]] + else: + self.paramlist[scalar["name"]] = scalar["start"] + if scalar["variability"] == "continuous": + self.continuouslist[scalar["name"]] = scalar["start"] + if scalar["causality"] == "input": + self.inputlist[scalar["name"]] = scalar["start"] + if scalar["causality"] == "output": + self.outputlist[scalar["name"]] = scalar["start"] + + self.quantitiesList.append(scalar) def getQuantities(self, names=None): # 3 """ @@ -933,7 +895,7 @@ def getQuantities(self, names=None): # 3 """ if names is None: return self.quantitiesList - elif (isinstance(names, str)): + elif isinstance(names, str): return [x for x in self.quantitiesList if x["name"] == names] elif isinstance(names, list): return [x for y in names for x in self.quantitiesList if x["name"] == y] @@ -960,18 +922,18 @@ def getContinuous(self, names=None): # 4 value = self.getSolutions(i) self.continuouslist[i] = value[0][-1] except Exception: - raise ModelicaSystemError("OM error: {} could not be computed".format(i)) + raise ModelicaSystemError(f"OM error: {i} could not be computed") return self.continuouslist - elif (isinstance(names, str)): + elif isinstance(names, str): if names in self.continuouslist: value = self.getSolutions(names) self.continuouslist[names] = value[0][-1] return [self.continuouslist.get(names)] else: - raise ModelicaSystemError("OM error: {} is not continuous".format(names)) + raise ModelicaSystemError(f"OM error: {names} is not continuous") - elif (isinstance(names, list)): + elif isinstance(names, list): valuelist = [] for i in names: if i in self.continuouslist: @@ -979,7 +941,7 @@ def getContinuous(self, names=None): # 4 self.continuouslist[i] = value[0][-1] valuelist.append(value[0][-1]) else: - raise ModelicaSystemError("OM error: {} is not continuous".format(i)) + raise ModelicaSystemError(f"OM error: {i} is not continuous") return valuelist def getParameters(self, names=None): # 5 @@ -993,9 +955,9 @@ def getParameters(self, names=None): # 5 """ if names is None: return self.paramlist - elif (isinstance(names, str)): + elif isinstance(names, str): return [self.paramlist.get(names, "NotExist")] - elif (isinstance(names, list)): + elif isinstance(names, list): return ([self.paramlist.get(x, "NotExist") for x in names]) def getlinearParameters(self, names=None): # 5 @@ -1004,12 +966,12 @@ def getlinearParameters(self, names=None): # 5 If *name is None then the function will return dict which contain all parameter names as key and value as corresponding values. eg., getParameters() Otherwise variable number of arguments can be passed as parameter name in string format separated by commas. eg., getParameters('paraName1', 'paraName2') """ - if (names == 0): + if names is None: return self.linearparameters - elif (isinstance(names, str)): + elif isinstance(names, str): return [self.linearparameters.get(names, "NotExist")] else: - return ([self.linearparameters.get(x, "NotExist") for x in names]) + return [self.linearparameters.get(x, "NotExist") for x in names] def getInputs(self, names=None): # 6 """ @@ -1019,9 +981,9 @@ def getInputs(self, names=None): # 6 """ if names is None: return self.inputlist - elif (isinstance(names, str)): + elif isinstance(names, str): return [self.inputlist.get(names, "NotExist")] - elif (isinstance(names, list)): + elif isinstance(names, list): return ([self.inputlist.get(x, "NotExist") for x in names]) def getOutputs(self, names=None): # 7 @@ -1036,7 +998,7 @@ def getOutputs(self, names=None): # 7 if not self.simulationFlag: if names is None: return self.outputlist - elif (isinstance(names, str)): + elif isinstance(names, str): return [self.outputlist.get(names, "NotExist")] else: return ([self.outputlist.get(x, "NotExist") for x in names]) @@ -1046,14 +1008,14 @@ def getOutputs(self, names=None): # 7 value = self.getSolutions(i) self.outputlist[i] = value[0][-1] return self.outputlist - elif (isinstance(names, str)): + elif isinstance(names, str): if names in self.outputlist: value = self.getSolutions(names) self.outputlist[names] = value[0][-1] return [self.outputlist.get(names)] else: return (names, " is not Output") - elif (isinstance(names, list)): + elif isinstance(names, list): valuelist = [] for i in names: if i in self.outputlist: @@ -1075,9 +1037,9 @@ def getSimulationOptions(self, names=None): # 8 """ if names is None: return self.simulateOptions - elif (isinstance(names, str)): + elif isinstance(names, str): return [self.simulateOptions.get(names, "NotExist")] - elif (isinstance(names, list)): + elif isinstance(names, list): return ([self.simulateOptions.get(x, "NotExist") for x in names]) def getLinearizationOptions(self, names=None): # 9 @@ -1091,9 +1053,9 @@ def getLinearizationOptions(self, names=None): # 9 """ if names is None: return self.linearOptions - elif (isinstance(names, str)): + elif isinstance(names, str): return [self.linearOptions.get(names, "NotExist")] - elif (isinstance(names, list)): + elif isinstance(names, list): return ([self.linearOptions.get(x, "NotExist") for x in names]) def getOptimizationOptions(self, names=None): # 10 @@ -1105,12 +1067,18 @@ def getOptimizationOptions(self, names=None): # 10 """ if names is None: return self.optimizeOptions - elif (isinstance(names, str)): + elif isinstance(names, str): return [self.optimizeOptions.get(names, "NotExist")] - elif (isinstance(names, list)): + elif isinstance(names, list): return ([self.optimizeOptions.get(x, "NotExist") for x in names]) - # to simulate or re-simulate model + def get_exe_file(self) -> pathlib.Path: + """Get path to model executable.""" + if platform.system() == "Windows": + return pathlib.Path(self.tempdir) / f"{self.modelName}.exe" + else: + return pathlib.Path(self.tempdir) / self.modelName + def simulate(self, resultfile=None, simflags=None): # 11 """ This method simulates model according to the simulation options. @@ -1119,39 +1087,35 @@ def simulate(self, resultfile=None, simflags=None): # 11 >>> simulate(resultfile="a.mat") >>> simulate(simflags="-noEventEmit -noRestart -override=e=0.3,g=10") # set runtime simulation flags """ - if (resultfile is None): + if resultfile is None: r = "" - self.resultfile = os.path.join(self.tempdir, self.modelName + "_res.mat").replace("\\", "/") + self.resultfile = (pathlib.Path(self.tempdir) / f"{self.modelName}_res.mat").as_posix() else: if os.path.exists(resultfile): - r = " -r=" + resultfile self.resultfile = resultfile else: - r = " -r=" + os.path.join(self.tempdir, resultfile).replace("\\", "/") - self.resultfile = os.path.join(self.tempdir, resultfile).replace("\\", "/") + self.resultfile = (pathlib.Path(self.tempdir) / resultfile).as_posix() + r = " -r=" + self.resultfile # allow runtime simulation flags from user input - if (simflags is None): + if simflags is None: simflags = "" else: simflags = " " + simflags - overrideFile = os.path.join(self.tempdir, '{}.{}'.format(self.modelName + "_override", "txt")).replace("\\", - "/") - if (self.overridevariables or self.simoptionsoverride): + overrideFile = pathlib.Path(self.tempdir) / f"{self.modelName}_override.txt" + if self.overridevariables or self.simoptionsoverride: tmpdict = self.overridevariables.copy() tmpdict.update(self.simoptionsoverride) # write to override file - file = open(overrideFile, "w") - for (key, value) in tmpdict.items(): - name = key + "=" + value + "\n" - file.write(name) - file.close() - override = " -overrideFile=" + overrideFile + with open(overrideFile, "w") as file: + for key, value in tmpdict.items(): + file.write(f"{key}={value}\n") + override = " -overrideFile=" + overrideFile.as_posix() else: override = "" - if (self.inputFlag): # if model has input quantities + if self.inputFlag: # if model has input quantities for i in self.inputlist: val = self.inputlist[i] if val is None: @@ -1160,15 +1124,11 @@ def simulate(self, resultfile=None, simflags=None): # 11 self.inputlist[i] = [(float(self.simulateOptions["startTime"]), 0.0), (float(self.simulateOptions["stopTime"]), 0.0)] if float(self.simulateOptions["startTime"]) != val[0][0]: - errstr = "!!! startTime not matched for Input {}".format(i) + errstr = f"!!! startTime not matched for Input {i}" self._raise_error(errstr=errstr) return if float(self.simulateOptions["stopTime"]) != val[-1][0]: - errstr = "!!! stopTime not matched for Input {}".format(i) - self._raise_error(errstr=errstr) - return - if val[0][0] < float(self.simulateOptions["startTime"]): - errstr = "Input time value is less than simulation startTime for inputs {}".format(i) + errstr = f"!!! stopTime not matched for Input {i}" self._raise_error(errstr=errstr) return self.createCSVData() # create csv file @@ -1176,19 +1136,14 @@ def simulate(self, resultfile=None, simflags=None): # 11 else: csvinput = "" - if (platform.system() == "Windows"): - getExeFile = os.path.join(self.tempdir, '{}.{}'.format(self.modelName, "exe")).replace("\\", "/") - else: - getExeFile = os.path.join(self.tempdir, self.modelName).replace("\\", "/") - - if os.path.exists(getExeFile): - cmd = getExeFile + override + csvinput + r + simflags - cmd = cmd.split(" ") - self._run_cmd(cmd=cmd) + exe_file = self.get_exe_file() + if not exe_file.exists(): + raise Exception(f"Error: Application file path not found: {exe_file}") - self.simulationFlag = True - else: - raise Exception("Error: Application file path not found: " + getExeFile) + cmd = exe_file.as_posix() + override + csvinput + r + simflags + cmd = cmd.split(" ") + self._run_cmd(cmd=cmd) + self.simulationFlag = True # to extract simulation results def getSolutions(self, varList=None, resultfile=None): # 12 @@ -1209,48 +1164,40 @@ def getSolutions(self, varList=None, resultfile=None): # 12 resFile = resultfile # check for result file exits - if (not os.path.exists(resFile)): - errstr = "Error: Result file does not exist {}".format(resFile) + if not os.path.exists(resFile): + errstr = f"Error: Result file does not exist {resFile}" self._raise_error(errstr=errstr) return - # exit() - else: - resultVars = self.sendExpression("readSimulationResultVars(\"" + resFile + "\")") + resultVars = self.sendExpression(f'readSimulationResultVars("{resFile}")') + self.sendExpression("closeSimulationResultFile()") + if varList is None: + return resultVars + elif isinstance(varList, str): + if varList not in resultVars and varList != "time": + self._raise_error(errstr=f'!!! {varList} does not exist') + return + res = self.sendExpression(f'readSimulationResult("{resFile}", {{{varList}}})') + npRes = np.array(res) self.sendExpression("closeSimulationResultFile()") - if varList is None: - return resultVars - elif (isinstance(varList, str)): - if (varList not in resultVars and varList != "time"): - errstr = '!!! ' + varList + ' does not exist' - self._raise_error(errstr=errstr) + return npRes + elif isinstance(varList, list): + # varList, = varList + for v in varList: + if v == "time": + continue + if v not in resultVars: + self._raise_error(errstr=f'!!! {v} does not exist') return - exp = "readSimulationResult(\"" + resFile + '",{' + varList + "})" - res = self.sendExpression(exp) - npRes = np.array(res) - exp2 = "closeSimulationResultFile()" - self.sendExpression(exp2) - return npRes - elif (isinstance(varList, list)): - # varList, = varList - for v in varList: - if v == "time": - continue - if v not in resultVars: - errstr = '!!! ' + v + ' does not exist' - self._raise_error(errstr=errstr) - return - variables = ",".join(varList) - exp = "readSimulationResult(\"" + resFile + '",{' + variables + "})" - res = self.sendExpression(exp) - npRes = np.array(res) - exp2 = "closeSimulationResultFile()" - self.sendExpression(exp2) - return npRes + variables = ",".join(varList) + res = self.sendExpression(f'readSimulationResult("{resFile}",{{{variables}}})') + npRes = np.array(res) + self.sendExpression("closeSimulationResultFile()") + return npRes def strip_space(self, name): - if (isinstance(name, str)): + if isinstance(name, str): return name.replace(" ", "") - elif (isinstance(name, list)): + elif isinstance(name, list): return [x.replace(" ", "") for x in name] def setMethodHelper(self, args1, args2, args3, args4=None): @@ -1277,14 +1224,13 @@ def apply_single(args1): return True else: - errstr = "\"" + value[0] + "\"" + " is not a" + args3 + " variable" - self._raise_error(errstr=errstr) + self._raise_error(errstr=f'"{value[0]}" is not a {args3} variable') result = [] - if (isinstance(args1, str)): + if isinstance(args1, str): result = [apply_single(args1)] - elif (isinstance(args1, list)): + elif isinstance(args1, list): result = [] args1 = self.strip_space(args1) for var in args1: @@ -1316,10 +1262,10 @@ def isParameterChangeable(self, name, value): q = self.getQuantities(name) if (q[0]["changeable"] == "false"): if self._verbose: - logger.info("setParameters() failed : It is not possible to set " + - "the following signal \"{}\", ".format(name) + "It seems to be structural, final, " + - "protected or evaluated or has a non-constant binding, use sendExpression(" + - "setParameterValue({}, {}, {}), ".format(self.modelName, name, value) + + logger.info("setParameters() failed : It is not possible to set " + f'the following signal "{name}", It seems to be structural, final, ' + "protected or evaluated or has a non-constant binding, use sendExpression(" + f"setParameterValue({self.modelName}, {name}, {value}), " "parsed=false) and rebuild the model using buildModel() API") return False return True @@ -1362,22 +1308,22 @@ def setInputs(self, name): # 15 >>> setInputs("Name=value") >>> setInputs(["Name1=value1","Name2=value2"]) """ - if (isinstance(name, str)): + if isinstance(name, str): name = self.strip_space(name) value = name.split("=") if value[0] in self.inputlist: tmpvalue = eval(value[1]) - if (isinstance(tmpvalue, int) or isinstance(tmpvalue, float)): + if isinstance(tmpvalue, int) or isinstance(tmpvalue, float): self.inputlist[value[0]] = [(float(self.simulateOptions["startTime"]), float(value[1])), (float(self.simulateOptions["stopTime"]), float(value[1]))] - elif (isinstance(tmpvalue, list)): + elif isinstance(tmpvalue, list): self.checkValidInputs(tmpvalue) self.inputlist[value[0]] = tmpvalue self.inputFlag = True else: errstr = value[0] + " is not an input" self._raise_error(errstr=errstr) - elif (isinstance(name, list)): + elif isinstance(name, list): name = self.strip_space(name) for var in name: value = var.split("=") @@ -1403,19 +1349,19 @@ def checkValidInputs(self, name): if l[0] < float(self.simulateOptions["startTime"]): ModelicaSystemError('Input time value is less than simulation startTime') if len(l) != 2: - ModelicaSystemError('Value for ' + l + ' is in incorrect format!') + ModelicaSystemError(f'Value for {l} is in incorrect format!') else: ModelicaSystemError('Error!!! Value must be in tuple format') # To create csv file for inputs def createCSVData(self): - sl = list() # Actual timestamps + sl = [] # Actual timestamps skip = False # check for NONE in input list and replace with proper data (e.g) [(startTime, 0.0), (stopTime, 0.0)] tmpinputlist = {} - for (key, value) in self.inputlist.items(): - if (value is None): + for key, value in self.inputlist.items(): + if value is None: tmpinputlist[key] = [(float(self.simulateOptions["startTime"]), 0.0), (float(self.simulateOptions["stopTime"]), 0.0)] else: @@ -1426,7 +1372,7 @@ def createCSVData(self): for i in inp: cl = list() el = list() - for (t, x) in i: + for t, x in i: cl.append(t) for i in cl: if skip is True: @@ -1455,7 +1401,7 @@ def createCSVData(self): inpSortedList.append(sortedList) for i in inpSortedList: ind = 0 - for (t, x) in i: + for t, x in i: if x == '?': t1 = i[ind - 1][0] u1 = i[ind - 1][1] @@ -1498,21 +1444,18 @@ def createCSVData(self): templist.append(x) interpolated_inputs_all.append(templist) - name_ = 'time' - # name = ','.join(self.__getInputNames()) name = ','.join(list(self.inputlist.keys())) - name = '{},{},{}'.format(name_, name, 'end') + name = f'time,{name},end' a = '' l = [] l.append(name) for i in range(0, len(sl)): - a = ("%s,%s" % (str(float(sl[i])), ",".join(list(str(float(inppp[i])) - for inppp in interpolated_inputs_all)))) + ',0' + a = f'{float(sl[i])},{",".join(str(float(inppp[i])) for inppp in interpolated_inputs_all)},0' l.append(a) - self.csvFile = os.path.join(self.tempdir, '{}.{}'.format(self.modelName, "csv")).replace("\\", "/") - with open(self.csvFile, "w") as f: + self.csvFile = (pathlib.Path(self.tempdir) / f'{self.modelName}.csv').as_posix() + with open(self.csvFile, "w", newline="") as f: writer = csv.writer(f, delimiter='\n') writer.writerow(l) f.close() @@ -1534,9 +1477,7 @@ def convertMo2Fmu(self, version="2.0", fmuType="me_cs", fileNamePrefix=">> optimize() """ cName = self.modelName - properties = ','.join("%s=%s" % (key, val) for (key, val) in list(self.optimizeOptions.items())) - self.sendExpression("setCommandLineOptions(\"-g=Optimica\")") + properties = ','.join(f"{key}={val}" for key, val in self.optimizeOptions.items()) + self.setCommandLineOptions("-g=Optimica") optimizeResult = self.requestApi('optimize', cName, properties) self._check_error() @@ -1591,19 +1532,15 @@ def linearize(self, lintime=None, simflags=None): # 22 raise IOError("Linearization cannot be performed as the model is not build, " "use ModelicaSystem() to build the model first") - overrideLinearFile = os.path.join(self.tempdir, - '{}.{}'.format(self.modelName + "_override_linear", "txt")).replace("\\", "/") + overrideLinearFile = pathlib.Path(self.tempdir) / f'{self.modelName}_override_linear.txt' - file = open(overrideLinearFile, "w") - for (key, value) in self.overridevariables.items(): - name = key + "=" + value + "\n" - file.write(name) - for (key, value) in self.linearOptions.items(): - name = key + "=" + str(value) + "\n" - file.write(name) - file.close() + with open(overrideLinearFile, "w") as file: + for key, value in self.overridevariables.items(): + file.write(f"{key}={value}\n") + for key, value in self.linearOptions.items(): + file.write(f"{key}={value}\n") - override = " -overrideFile=" + overrideLinearFile + override = " -overrideFile=" + overrideLinearFile.as_posix() logger.debug(f"overwrite = {override}") if self.inputFlag: @@ -1620,53 +1557,47 @@ def linearize(self, lintime=None, simflags=None): # 22 csvinput = "" # prepare the linearization runtime command - if (platform.system() == "Windows"): - getExeFile = os.path.join(self.tempdir, '{}.{}'.format(self.modelName, "exe")).replace("\\", "/") - else: - getExeFile = os.path.join(self.tempdir, self.modelName).replace("\\", "/") + exe_file = self.get_exe_file() - if lintime is None: - linruntime = " -l=" + str(self.linearOptions["stopTime"]) - else: - linruntime = " -l=" + lintime + linruntime = f' -l={lintime or self.linearOptions["stopTime"]}' if simflags is None: simflags = "" else: simflags = " " + simflags - if (os.path.exists(getExeFile)): - cmd = getExeFile + linruntime + override + csvinput + simflags + if not exe_file.exists(): + raise Exception(f"Error: Application file path not found: {exe_file}") + else: + cmd = exe_file.as_posix() + linruntime + override + csvinput + simflags cmd = cmd.split(' ') self._run_cmd(cmd=cmd) - else: - raise Exception("Error: Application file path not found: " + getExeFile) # code to get the matrix and linear inputs, outputs and states - linearFile = os.path.join(self.tempdir, "linearized_model.py").replace("\\", "/") + linearFile = pathlib.Path(self.tempdir) / "linearized_model.py" # support older openmodelica versions before OpenModelica v1.16.2 where linearize() generates "linear_modelname.mo" file - if not os.path.exists(linearFile): - linearFile = '{}_{}.{}'.format('linear', self.modelName, 'py') + if not linearFile.exists(): + linearFile = pathlib.Path(f'linear_{self.modelName}.py') - if os.path.exists(linearFile): - # this function is called from the generated python code linearized_model.py at runtime, - # to improve the performance by directly reading the matrices A, B, C and D from the julia code and avoid building the linearized modelica model - try: - # do not add the linearfile directory to path, as multiple execution of linearization will always use the first added path, instead execute the file - # https://github.com/OpenModelica/OMPython/issues/196 - module = importlib.machinery.SourceFileLoader("linearized_model", linearFile).load_module() - result = module.linearized_model() - (n, m, p, x0, u0, A, B, C, D, stateVars, inputVars, outputVars) = result - self.linearinputs = inputVars - self.linearoutputs = outputVars - self.linearstates = stateVars - return [A, B, C, D] - except ModuleNotFoundError: - raise Exception("ModuleNotFoundError: No module named 'linearized_model'") - else: + if not linearFile.exists(): errormsg = self.sendExpression("getErrorString()") - raise ModelicaSystemError("Linearization failed: {} not found: {}".format(repr(linearFile), errormsg)) + raise ModelicaSystemError(f"Linearization failed: {linearFile} not found: {errormsg}") + + # this function is called from the generated python code linearized_model.py at runtime, + # to improve the performance by directly reading the matrices A, B, C and D from the julia code and avoid building the linearized modelica model + try: + # do not add the linearfile directory to path, as multiple execution of linearization will always use the first added path, instead execute the file + # https://github.com/OpenModelica/OMPython/issues/196 + module = importlib.machinery.SourceFileLoader("linearized_model", linearFile.as_posix()).load_module() + result = module.linearized_model() + (n, m, p, x0, u0, A, B, C, D, stateVars, inputVars, outputVars) = result + self.linearinputs = inputVars + self.linearoutputs = outputVars + self.linearstates = stateVars + return [A, B, C, D] + except ModuleNotFoundError: + raise Exception("ModuleNotFoundError: No module named 'linearized_model'") def getLinearInputs(self): """ diff --git a/tests/test_ModelicaSystem.py b/tests/test_ModelicaSystem.py index 44884a32b..5704c9509 100644 --- a/tests/test_ModelicaSystem.py +++ b/tests/test_ModelicaSystem.py @@ -4,15 +4,16 @@ import shutil import os import pathlib +import numpy as np class ModelicaSystemTester(unittest.TestCase): def __init__(self, *args, **kwargs): super(ModelicaSystemTester, self).__init__(*args, **kwargs) - self.tmp = tempfile.mkdtemp(prefix='tmpOMPython.tests') - with open("%s/M.mo" % self.tmp, "w") as fout: + self.tmp = pathlib.Path(tempfile.mkdtemp(prefix='tmpOMPython.tests')) + with open(self.tmp / "M.mo", "w") as fout: fout.write("""model M - Real x(start = 1); + Real x(start = 1, fixed = true); parameter Real a = -1; equation der(x) = x*a; @@ -24,12 +25,12 @@ def __del__(self): def testModelicaSystemLoop(self): def worker(): - filePath = os.path.join(self.tmp, "M.mo").replace("\\", "/") + filePath = (self.tmp / "M.mo").as_posix() m = OMPython.ModelicaSystem(filePath, "M") m.simulate() m.convertMo2Fmu(fmuType="me") - for _ in range(10): - worker() + for _ in range(10): + worker() def test_setParameters(self): omc = OMPython.OMCSessionZMQ() @@ -79,16 +80,10 @@ def test_setSimulationOptions(self): def test_relative_path(self): cwd = pathlib.Path.cwd() - (fd, name) = tempfile.mkstemp(dir=cwd, text=True) + (fd, name) = tempfile.mkstemp(prefix='tmpOMPython.tests', dir=cwd, text=True) try: with os.fdopen(fd, 'w') as f: - f.write("""model M - Real x(start = 1, fixed=true); - parameter Real a = -1; -equation - der(x) = x*a; -end M; -""") + f.write((self.tmp / "M.mo").read_text()) model_file = pathlib.Path(name).relative_to(cwd) model_relative = str(model_file) @@ -100,6 +95,297 @@ def test_relative_path(self): # clean up the temporary file model_file.unlink() + def test_customBuildDirectory(self): + filePath = (self.tmp / "M.mo").as_posix() + tmpdir = self.tmp / "tmpdir1" + tmpdir.mkdir() + m = OMPython.ModelicaSystem(filePath, "M", raiseerrors=True, + customBuildDirectory=tmpdir) + assert pathlib.Path(m.getWorkDirectory()).resolve() == tmpdir.resolve() + result_file = tmpdir / "a.mat" + assert not result_file.exists() + m.simulate(resultfile="a.mat") + assert result_file.is_file() + + def test_getSolutions(self): + filePath = (self.tmp / "M.mo").as_posix() + mod = OMPython.ModelicaSystem(filePath, "M", raiseerrors=True) + x0 = 1 + a = -1 + tau = -1 / a + stopTime = 5*tau + mod.setSimulationOptions([f"stopTime={stopTime}", "stepSize=0.1", "tolerance=1e-8"]) + mod.simulate() + + x = mod.getSolutions("x") + t, x2 = mod.getSolutions(["time", "x"]) + assert (x2 == x).all() + sol_names = mod.getSolutions() + assert isinstance(sol_names, tuple) + assert "time" in sol_names + assert "x" in sol_names + assert "der(x)" in sol_names + with self.assertRaises(OMPython.ModelicaSystemError): + mod.getSolutions("t") # variable 't' does not exist + assert np.isclose(t[0], 0), "time does not start at 0" + assert np.isclose(t[-1], stopTime), "time does not end at stopTime" + x_analytical = x0 * np.exp(a*t) + assert np.isclose(x, x_analytical, rtol=1e-4).all() + + def test_getters(self): + model_file = self.tmp / "M_getters.mo" + model_file.write_text(""" +model M_getters + Real x(start = 1, fixed = true); + output Real y "the derivative"; + parameter Real a = -0.5; + parameter Real b = 0.1; +equation + der(x) = x*a + b; + y = der(x); +end M_getters; +""") + mod = OMPython.ModelicaSystem(model_file.as_posix(), "M_getters", raiseerrors=True) + + q = mod.getQuantities() + assert isinstance(q, list) + assert sorted(q, key=lambda d: d["name"]) == sorted([ + { + 'alias': 'noAlias', + 'aliasvariable': None, + 'causality': 'local', + 'changeable': 'true', + 'description': None, + 'max': None, + 'min': None, + 'name': 'x', + 'start': '1.0', + 'unit': None, + 'variability': 'continuous', + }, + { + 'alias': 'noAlias', + 'aliasvariable': None, + 'causality': 'local', + 'changeable': 'false', + 'description': None, + 'max': None, + 'min': None, + 'name': 'der(x)', + 'start': None, + 'unit': None, + 'variability': 'continuous', + }, + { + 'alias': 'noAlias', + 'aliasvariable': None, + 'causality': 'output', + 'changeable': 'false', + 'description': 'the derivative', + 'max': None, + 'min': None, + 'name': 'y', + 'start': '-0.4', + 'unit': None, + 'variability': 'continuous', + }, + { + 'alias': 'noAlias', + 'aliasvariable': None, + 'causality': 'parameter', + 'changeable': 'true', + 'description': None, + 'max': None, + 'min': None, + 'name': 'a', + 'start': '-0.5', + 'unit': None, + 'variability': 'parameter', + }, + { + 'alias': 'noAlias', + 'aliasvariable': None, + 'causality': 'parameter', + 'changeable': 'true', + 'description': None, + 'max': None, + 'min': None, + 'name': 'b', + 'start': '0.1', + 'unit': None, + 'variability': 'parameter', + } + ], key=lambda d: d["name"]) + + assert mod.getQuantities("y") == [ + { + 'alias': 'noAlias', + 'aliasvariable': None, + 'causality': 'output', + 'changeable': 'false', + 'description': 'the derivative', + 'max': None, + 'min': None, + 'name': 'y', + 'start': '-0.4', + 'unit': None, + 'variability': 'continuous', + } + ] + + assert mod.getQuantities(["y", "x"]) == [ + { + 'alias': 'noAlias', + 'aliasvariable': None, + 'causality': 'output', + 'changeable': 'false', + 'description': 'the derivative', + 'max': None, + 'min': None, + 'name': 'y', + 'start': '-0.4', + 'unit': None, + 'variability': 'continuous', + }, + { + 'alias': 'noAlias', + 'aliasvariable': None, + 'causality': 'local', + 'changeable': 'true', + 'description': None, + 'max': None, + 'min': None, + 'name': 'x', + 'start': '1.0', + 'unit': None, + 'variability': 'continuous', + }, + ] + + assert mod.getInputs() == {} + # getOutputs before simulate() + assert mod.getOutputs() == {'y': '-0.4'} + assert mod.getOutputs("y") == ["-0.4"] + assert mod.getOutputs(["y", "y"]) == ["-0.4", "-0.4"] + + # getContinuous before simulate(): + assert mod.getContinuous() == { + 'x': '1.0', + 'der(x)': None, + 'y': '-0.4' + } + assert mod.getContinuous("y") == ['-0.4'] + assert mod.getContinuous(["y", "x"]) == ['-0.4', '1.0'] + assert mod.getContinuous("a") == ["NotExist"] # a is a parameter + + stopTime = 1.0 + a = -0.5 + b = 0.1 + x0 = 1.0 + x_analytical = -b/a + (x0 + b/a) * np.exp(a * stopTime) + dx_analytical = (x0 + b/a) * a * np.exp(a * stopTime) + mod.setSimulationOptions(f"stopTime={stopTime}") + mod.simulate() + + # getOutputs after simulate() + d = mod.getOutputs() + assert d.keys() == {"y"} + assert np.isclose(d["y"], dx_analytical, 1e-4) + assert mod.getOutputs("y") == [d["y"]] + assert mod.getOutputs(["y", "y"]) == [d["y"], d["y"]] + + # getContinuous after simulate() should return values at end of simulation: + with self.assertRaises(OMPython.ModelicaSystemError): + mod.getContinuous("a") # a is a parameter + with self.assertRaises(OMPython.ModelicaSystemError): + mod.getContinuous(["x", "a", "y"]) # a is a parameter + d = mod.getContinuous() + assert d.keys() == {"x", "der(x)", "y"} + assert np.isclose(d["x"], x_analytical, 1e-4) + assert np.isclose(d["der(x)"], dx_analytical, 1e-4) + assert np.isclose(d["y"], dx_analytical, 1e-4) + assert mod.getContinuous("x") == [d["x"]] + assert mod.getContinuous(["y", "x"]) == [d["y"], d["x"]] + + with self.assertRaises(OMPython.ModelicaSystemError): + mod.setSimulationOptions("thisOptionDoesNotExist=3") + + def test_simulate_inputs(self): + model_file = self.tmp / "M_input.mo" + model_file.write_text(""" +model M_input + Real x(start=0, fixed=true); + input Real u1; + input Real u2; + output Real y; +equation + der(x) = u1 + u2; + y = x; +end M_input; +""") + mod = OMPython.ModelicaSystem(model_file.as_posix(), "M_input", raiseerrors=True) + + mod.setSimulationOptions("stopTime=1.0") + + # integrate zero (no setInputs call) - it should default to None -> 0 + assert mod.getInputs() == { + "u1": None, + "u2": None, + } + mod.simulate() + y = mod.getSolutions("y")[0] + assert np.isclose(y[-1], 0.0) + + # integrate a constant + mod.setInputs("u1=2.5") + assert mod.getInputs() == { + "u1": [ + (0.0, 2.5), + (1.0, 2.5), + ], + "u2": None, + } + mod.simulate() + y = mod.getSolutions("y")[0] + assert np.isclose(y[-1], 2.5) + + # now let's integrate the sum of two ramps + mod.setInputs("u1=[(0.0, 0.0), (0.5, 2), (1.0, 0)]") + assert mod.getInputs("u1") == [[ + (0.0, 0.0), + (0.5, 2.0), + (1.0, 0.0), + ]] + mod.simulate() + y = mod.getSolutions("y")[0] + assert np.isclose(y[-1], 1.0) + + # let's try some edge cases + mod.setInputs("u1=[(-0.5, 0.0), (1.0, 1)]") + # unmatched startTime + with self.assertRaises(OMPython.ModelicaSystemError): + mod.simulate() + # unmatched stopTime + mod.setInputs("u1=[(0.0, 0.0), (0.5, 1)]") + with self.assertRaises(OMPython.ModelicaSystemError): + mod.simulate() + + # Let's use both inputs, but each one with different number of of + # samples. This has an effect when generating the csv file. + mod.setInputs([ + "u1=[(0.0, 0), (1.0, 1)]", + "u2=[(0.0, 0), (0.25, 0.5), (0.5, 1.0), (1.0, 0)]", + ]) + mod.simulate() + assert pathlib.Path(mod.csvFile).read_text() == """time,u1,u2,end +0.0,0.0,0.0,0 +0.25,0.25,0.5,0 +0.5,0.5,1.0,0 +1.0,1.0,0.0,0 +""" + y = mod.getSolutions("y")[0] + assert np.isclose(y[-1], 1.0) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_ZMQ.py b/tests/test_ZMQ.py index e13ea421a..539bd7335 100644 --- a/tests/test_ZMQ.py +++ b/tests/test_ZMQ.py @@ -40,6 +40,12 @@ def testSimulate(self): self.assertNotEqual("", self.om.sendExpression('res.resultFile')) self.clean() + def test_execute(self): + self.assertEqual('"HelloWorld!"\n', self.om.execute('"HelloWorld!"')) + self.assertEqual('"HelloWorld!"\n', self.om.sendExpression('"HelloWorld!"', parsed=False)) + self.assertEqual('HelloWorld!', self.om.sendExpression('"HelloWorld!"', parsed=True)) + self.clean() + if __name__ == '__main__': unittest.main() diff --git a/tests/test_linearization.py b/tests/test_linearization.py index 151f05658..07709c272 100644 --- a/tests/test_linearization.py +++ b/tests/test_linearization.py @@ -1,13 +1,16 @@ import OMPython import tempfile import shutil -import os +import unittest +import pathlib +import numpy as np -class Test_Linearization: - def loadModel(self): - self.tmp = tempfile.mkdtemp(prefix='tmpOMPython.tests') - with open("%s/linearTest.mo" % self.tmp, "w") as fout: +class Test_Linearization(unittest.TestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.tmp = pathlib.Path(tempfile.mkdtemp(prefix='tmpOMPython.tests')) + with open(self.tmp / "linearTest.mo", "w") as fout: fout.write(""" model linearTest Real x1(start=1); @@ -21,15 +24,13 @@ def loadModel(self): f*x4 - e*x3 - der(x3) = x1; der(x4) = x1 + x2 + der(x3) + x4; end linearTest; - """) +""") def __del__(self): shutil.rmtree(self.tmp, ignore_errors=True) def test_example(self): - self.loadModel() - filePath = os.path.join(self.tmp, "linearTest.mo").replace("\\", "/") - print(filePath) + filePath = (self.tmp / "linearTest.mo").as_posix() mod = OMPython.ModelicaSystem(filePath, "linearTest") [A, B, C, D] = mod.linearize() expected_matrixA = [[-3, 2, 0, 0], [-7, 0, -5, 1], [-1, 0, -1, 4], [0, 1, -1, 5]] @@ -37,3 +38,47 @@ def test_example(self): assert B == [], f"Matrix does not match the expected value. Got: {B}, Expected: {[]}" assert C == [], f"Matrix does not match the expected value. Got: {C}, Expected: {[]}" assert D == [], f"Matrix does not match the expected value. Got: {D}, Expected: {[]}" + assert mod.getLinearInputs() == [] + assert mod.getLinearOutputs() == [] + assert mod.getLinearStates() == ["x1", "x2", "x3", "x4"] + + def test_getters(self): + model_file = self.tmp / "pendulum.mo" + model_file.write_text(""" +model Pendulum + Real phi(start=Modelica.Constants.pi, fixed=true); + Real omega(start=0, fixed=true); + input Real u1; + input Real u2; + output Real y1; + output Real y2; + parameter Real l = 1.2; + parameter Real g = 9.81; +equation + der(phi) = omega + u2; + der(omega) = -g/l * sin(phi); + y1 = y2 + 0.5*omega; + y2 = phi + u1; +end Pendulum; +""") + mod = OMPython.ModelicaSystem(model_file.as_posix(), "Pendulum", ["Modelica"], raiseerrors=True) + + d = mod.getLinearizationOptions() + assert isinstance(d, dict) + assert "startTime" in d + assert "stopTime" in d + assert mod.getLinearizationOptions(["stopTime", "startTime"]) == [d["stopTime"], d["startTime"]] + mod.setLinearizationOptions("stopTime=0.02") + assert mod.getLinearizationOptions("stopTime") == ["0.02"] + + mod.setInputs(["u1=0", "u2=0"]) + [A, B, C, D] = mod.linearize() + g = float(mod.getParameters("g")[0]) + l = float(mod.getParameters("l")[0]) + assert mod.getLinearInputs() == ["u1", "u2"] + assert mod.getLinearStates() == ["omega", "phi"] + assert mod.getLinearOutputs() == ["y1", "y2"] + assert np.isclose(A, [[0, g/l], [1, 0]]).all() + assert np.isclose(B, [[0, 0], [0, 1]]).all() + assert np.isclose(C, [[0.5, 1], [0, 1]]).all() + assert np.isclose(D, [[1, 0], [1, 0]]).all() diff --git a/tests/test_optimization.py b/tests/test_optimization.py new file mode 100644 index 000000000..ae93d0b8e --- /dev/null +++ b/tests/test_optimization.py @@ -0,0 +1,78 @@ +import OMPython +import tempfile +import shutil +import unittest +import pathlib +import numpy as np + + +class Test_Linearization(unittest.TestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.tmp = pathlib.Path(tempfile.mkdtemp(prefix='tmpOMPython.tests')) + + def __del__(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_example(self): + model_file = self.tmp / "BangBang2021.mo" + model_file.write_text(""" +model BangBang2021 "Model to verify that optimization gives bang-bang optimal control" + parameter Real m = 1; + parameter Real p = 1 "needed for final constraints"; + + Real a; + Real v(start = 0, fixed = true); + Real pos(start = 0, fixed = true); + Real pow(min = -30, max = 30) = f * v annotation(isConstraint = true); + + input Real f(min = -10, max = 10); + + Real costPos(nominal = 1) = -pos "minimize -pos(tf)" annotation(isMayer=true); + + Real conSpeed(min = 0, max = 0) = p * v " 0<= p*v(tf) <=0" annotation(isFinalConstraint = true); + +equation + + der(pos) = v; + der(v) = a; + f = m * a; + +annotation(experiment(StartTime = 0, StopTime = 1, Tolerance = 1e-07, Interval = 0.01), +__OpenModelica_simulationFlags(s="optimization", optimizerNP="1"), +__OpenModelica_commandLineOptions="+g=Optimica"); + +end BangBang2021; +""") + + mod = OMPython.ModelicaSystem(model_file.as_posix(), "BangBang2021", + raiseerrors=True) + + mod.setOptimizationOptions(["numberOfIntervals=16", "stopTime=1", + "stepSize=0.001", "tolerance=1e-8"]) + + # test the getter + assert mod.getOptimizationOptions()["stopTime"] == "1" + assert mod.getOptimizationOptions("stopTime") == ["1"] + assert mod.getOptimizationOptions(["tolerance", "stopTime"]) == ["1e-8", "1"] + + r = mod.optimize() + # it is necessary to specify resultfile, otherwise it wouldn't find it. + time, f, v = mod.getSolutions(["time", "f", "v"], resultfile=r["resultFile"]) + assert np.isclose(f[0], 10) + assert np.isclose(f[-1], -10) + + def f_fcn(time, v): + if time < 0.3: + return 10 + if time <= 0.5: + return 30 / v + if time < 0.7: + return -30 / v + return -10 + f_expected = [f_fcn(t, v) for t, v in zip(time, v)] + + # The sharp edge at time=0.5 probably won't match, let's leave that out. + matches = np.isclose(f, f_expected, 1e-3) + assert matches[:498].all() + assert matches[502:].all()