diff --git a/src/opi/execution/base.py b/src/opi/execution/base.py new file mode 100644 index 00000000..07d26173 --- /dev/null +++ b/src/opi/execution/base.py @@ -0,0 +1,494 @@ +""" +Module that contains `BaseRunner` class which facilitates the execution of ORCA binaries. + +Attributes +---------- +RunnerType: + Helper variable for type annotation. +P: + ParamSpec helper variable. +R: + Helper variable for type annotation. +""" + +import os +import shutil +import subprocess +from contextlib import nullcontext +from io import TextIOWrapper +from pathlib import Path +from subprocess import CompletedProcess +from typing import Any, Callable, Concatenate, ParamSpec, Sequence, TypeVar, cast + +from opi import ORCA_MINIMAL_VERSION +from opi.lib.orca_binary import OrcaBinary +from opi.utils.config import get_config +from opi.utils.misc import add_to_env, check_minimal_version, delete_empty_file, resolve_binary_name +from opi.utils.orca_version import OrcaVersion + +RunnerType = TypeVar("RunnerType", bound="BaseRunner") +P = ParamSpec("P") +R = TypeVar("R") + + +def _orca_environment( + runner: Callable[Concatenate[RunnerType, P], R], / +) -> Callable[Concatenate[RunnerType, P], R]: + """ + Wrapper that temporarily modifies environment, to ensure that the correct ORCA and OpenMPI installation are found. + Resets environment upon exiting. + + Parameters + ---------- + runner : Callable[Concatenate[RunnerType, P], R] + Function that is to be wrapped. + """ + + def wrapper(self: RunnerType, /, *args: Any, **kwargs: Any) -> R: + org_env = os.environ.copy() + try: + # ////////////////////////////// + # > SETUP ENVIRONMENT + # ////////////////////////////// + + # > Updating necessary environmental variables. + add_to_env("PATH", str(self._orca_bin_folder), prepend=True) + add_to_env("LD_LIBRARY_PATH", str(self._orca_lib_folder), prepend=True) + + # > Setting Open MPI path + if self._open_mpi_path: + add_to_env("PATH", str(self._open_mpi_path / "bin"), prepend=True) + add_to_env("LD_LIBRARY_PATH", str(self._open_mpi_path / "lib"), prepend=True) + + # ////////////////////////////// + # > Call Runner + # ////////////////////////////// + return runner(self, *args, **kwargs) + finally: + os.environ = org_env # type: ignore + + # << END OF INNER FUNC + + return wrapper + + +class BaseRunner: + """ + Base class that facilitates the execution of ORCA binaries, especially the main ORCA binary. + Makes sure that correct ORCA binary and MPI libraries are used. + This class is intended to be subclassed to execute an ORCA binary. + """ + + def __init__(self, working_dir: Path | str | os.PathLike[str] | None = None) -> None: + """ + Parameters + ---------- + working_dir : Path | str | os.PathLike[str] | None, default = None + Optional working directory for execution. + """ + # > Working dir. Must exist! + self._working_dir: Path = Path.cwd() + self.working_dir: Path = cast(Path, working_dir) + + # ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + # > ORCA & Open MPI Installation + # ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + # > Either the main ORCA folder contains a 'bin/' and a 'lib/' folder or all files are just contained in the main folder. + self._orca_bin_folder: Path | None = None + self._orca_lib_folder: Path | None = None + # > Open MPI location + # > The variable stores the path to base folder of Open MPI. + # >> May stay `None` if Open MPI is already present in $PATH. + self._open_mpi_path: Path | None = None + + self.set_orca_path() + self.set_open_mpi_path() + + @property + def working_dir(self) -> Path: + return self._working_dir + + @working_dir.setter + def working_dir(self, value: Path | str | os.PathLike[str] | None) -> None: + """ + Parameters + ---------- + value : Path | str | os.PathLike[str] | None + """ + + if value is None: + # > Unsetting working_dir by setting it to CWD. + # > Thereby, working_dir is never "unset". + self._working_dir = Path.cwd() + else: + value = Path(value) + if not value.is_dir(): + raise ValueError( + f"{self.__class__.__name__}.working_dir: {value} does is not a directory!" + ) + # > Completely resolving path + self._working_dir = value.expanduser().resolve() + + @_orca_environment + def run( + self, + binary: OrcaBinary, + args: Sequence[str] = (), + /, + *, + stdin_str: str | None = None, + stdout: Path | None = None, + stderr: Path | None = None, + silent: bool = True, + capture: bool = False, + cwd: Path | None = None, + timeout: int = -1, + ) -> subprocess.CompletedProcess[str] | None: + """ + Function that executes ORCA binary. + + Parameters + ---------- + binary : OrcaBinary + Name of ORCA binary to be executed. Path is automatically resolved based on configuration. + args : Sequence[str], default: () + Command line arguments to pass to ORCA binary. + stdin_str: str | None = None + String to be passed to stdin. + stdout : Path | None, default: None + Dump STDOUT to a file. + stderr : Path | None, default: None + Dump STDERR to a file. + silent : bool, default: True + Redirect STDOUT and STDERR to null-device. + Is overruled respectively by `stdout` and `stderr` and `capture`. + capture : bool, default: False + Capture STDOUT and STDERR and return with `CompletedProcess[str]` object. + Is overruled respectively by `stdout` and `stderr`. + cwd : Path | None, default: None + Set working directory for execution. Overrules `self.working_dir`. + timeout : int, default: -1 + Optional timeout in seconds to wait for process to complete. + + Returns + ------- + subprocess.subprocess.CompletedProcess[str] | None: + Completed ORCA process. + + Raises + ------ + FileNotFound: + Error if path to ORCA binary cannot be resolved. + subprocess.TimeoutExpired: + If `timeout>-1` and the process times out. + """ + + # ------------------------------------------------------------ + def determine_dump(source: Path | None = None) -> TextIOWrapper: + """ + Determine where to dump `source` to. + + Parameters + ---------- + source : Path | None, default: None + """ + + if source: + return source.open("w") + elif capture: + return cast(TextIOWrapper, nullcontext(subprocess.PIPE)) + elif silent: + return Path(os.devnull).open("w") + else: + return cast(TextIOWrapper, nullcontext()) + + # ------------------------------------------------------------ + + if not isinstance(binary, OrcaBinary): + raise ValueError(f"`binary` must be of type OrcaBinary, not: {type(binary)}") + + # > Working dir + if not cwd: + cwd = self.working_dir + + # > Get requested ORCA binary + orca_bin = self.get_orca_binary(binary) + + # > STDOUT and STDERR capturing/dumping + outfile = determine_dump(stdout) + errfile = determine_dump(stderr) + + # > Assembling full call + cmd = [str(orca_bin)] + if args: + cmd += list(args) + + # Run the binary + proc = None + try: + with outfile as f_out, errfile as f_err: + proc = subprocess.run( + cmd, + input=stdin_str, + stdout=f_out, + stderr=f_err, + cwd=cwd, + text=True, + timeout=timeout if timeout > 0 else None, + ) + return proc + except subprocess.TimeoutExpired: + raise + finally: + # > Delete empty STDOUT and STDERR dumps + if stdout: + delete_empty_file(stdout) + if stderr: + delete_empty_file(stderr) + + def get_version(self) -> OrcaVersion | None: + """ + Get the ORCA version from the main ORCA binary. + + Returns + ------- + OrcaVersion: + Version of the ORCA. + None: + If the version could not be determined. + """ + + try: + # > May raise subprocess.TimeoutExpired + orca_proc = self.run(OrcaBinary.ORCA, ["--version"], capture=True, timeout=5) + + # > Pleasing type checker + assert isinstance(orca_proc, CompletedProcess) + return OrcaVersion.from_output(orca_proc.stdout) + + except (subprocess.TimeoutExpired, ValueError, AssertionError): + return None + + def check_version(self, *, ignore_errors: bool = False) -> bool | None: + """ + Check if the ORCA version of the binary is compatible with the current OPI version. + + Parameters + ---------- + ignore_errors : bool, default: False + False: Raises RuntimeError if version is not compatible or could not be determined. + True: Return True if version is compatible, else return False. Also if the version could not be determined. + + Returns + ------- + bool : + True: If version is compatible. + False: If version is not compatible. + None : + If version could not be determined. + + Raises + ------ + RuntimeError: If `ignore_errors` is False and version is not compatible or could not be determined. + """ + + orca_vers = self.get_version() + + # > Path as string to ORCA binary + try: + orca_bin_str = f"\nORCA binary: {self.get_orca_binary(OrcaBinary.ORCA)}" + except FileNotFoundError: + orca_bin_str = "" + + if orca_vers is None: + if ignore_errors: + return None + else: + raise RuntimeError( + f"Could not determine version of ORCA binary." + f" Make sure ORCA is installed and configured correctly." + f" Minimally required ORCA version: {ORCA_MINIMAL_VERSION}{orca_bin_str}" + ) + + elif not check_minimal_version(orca_vers): + if ignore_errors: + return False + else: + raise RuntimeError( + f"ORCA version {orca_vers} is not supported. Make sure to install at least version:" + f" {ORCA_MINIMAL_VERSION}{orca_bin_str}" + ) + else: + return True + + @staticmethod + def _determine_orca_paths(orca_path: Path, /) -> tuple[Path, Path]: + """ + Determine the actual path to the folders that contains the ORCA binaries as well as the libraries. + We allow several formats, to specify the path to ORCA. + + Parameters + ---------- + orca_path : Path + Can either point to: + 1) the main ORCA binary directly, which must have the name "orca". + 2) the folder which contains the main ORCA binary `orca` either `./orca` or `./bin/orca` + + Returns + ------- + Path: + The path to the folder that contains the ORCA binaries. + Path: + The path to the folder that contains the ORCA libraries. + Both paths can coincide. + """ + + if not isinstance(orca_path, Path): + raise TypeError(f"'orca_path' parameter is not a Path, but: {type(orca_path)}") + + # > Resolving path. This will also check if the target exists + try: + orca_path = orca_path.expanduser().resolve(strict=True) + except FileNotFoundError: + raise FileNotFoundError(f"ORCA path does not exist: {orca_path}") + + # > Case 1 + if orca_path.is_file() and orca_path.name == resolve_binary_name(OrcaBinary.ORCA): + # > Check if the parent dir is 'bin/' + if orca_path.parent.name == "bin": + orca_bin_folder = orca_path.parent + orca_lib_folder = orca_bin_folder.with_name("lib") + else: + orca_bin_folder = orca_path.parent + orca_lib_folder = orca_bin_folder + + # > Case 2 + elif orca_path.is_dir(): + # > Check if the current dir contains a bin or a lib folder. + if (orca_path / "bin").exists(): + orca_bin_folder = orca_path / "bin" + orca_lib_folder = orca_path / "lib" + else: + orca_bin_folder = orca_path + orca_lib_folder = orca_path + + # > NOT FOUND + else: + raise RuntimeError(f"Path to ORCA is invalid: {orca_path}") + + # > Make sure both folders exists + assert orca_bin_folder is not None + assert orca_lib_folder is not None + # > Check that binary folder exists + if not orca_bin_folder.is_dir(): + raise FileNotFoundError( + f"The ORCA binary folder does not exists or is not a folder: {orca_bin_folder}" + ) + # > If the bin and lib folder do not coincide, we also check the lib folder. + if orca_bin_folder != orca_lib_folder and not orca_lib_folder.is_dir(): + raise FileNotFoundError( + f"The ORCA library folder does not exists or is not a folder: {orca_lib_folder}" + ) + + return orca_bin_folder, orca_lib_folder + + def set_orca_path(self, orca_path: Path | None = None, /) -> None: + """ + Determine and set the ORCA installation to be used. + + Parameters + ---------- + orca_path : Path | None, default: None + """ + + # > Fetching OPI config. Needs to fetched first, as it might be empty or not exist. + orca_path_config = None + if config := get_config(): + orca_path_config = config.get("ORCA_PATH") + + # > Case 1: Path given via function parameters + if orca_path is not None: + if not isinstance(orca_path, Path): + raise TypeError(f"'orca_path' parameter is not a Path, but: {type(orca_path)}") + # << END OF IF + # << END OF IF + + # > Case 2: $OPI_PATH + elif opi_var_orca_path := os.environ.get("OPI_ORCA"): + orca_path = Path(opi_var_orca_path) + + # > Case 3: Config file + elif orca_path_config: + orca_path = Path(orca_path_config) + + # > Case 4: $PATH + elif var_orca_path := shutil.which("orca"): + orca_path = Path(var_orca_path) + + # > NOT FOUND + else: + raise RuntimeError("Could not find ORCA.") + + # > Now determine the bin/ and lib/ folder + self._orca_bin_folder, self._orca_lib_folder = self._determine_orca_paths(orca_path) + + def set_open_mpi_path(self, mpi_path: Path | None = None, /) -> None: + """ + Determine and set the Open MPI installation to be used. + + Parameters + ---------- + mpi_path : Path | None, default: None + """ + + # > Needs to fetched ahead of other check, as it might be empty or not exist. + mpi_path_config = None + if config := get_config(): + mpi_path_config = config.get("MPI_PATH") + + # > Case 1: Path given via function parameter + if mpi_path is not None: + if not isinstance(mpi_path, Path): + raise TypeError(f"'mpi_path' parameter is not a Path, but: {type(mpi_path)}") + # << END OF IF + + # > Case 2: $OPI_MPI + elif opi_var_open_mpi_path := os.environ.get("OPI_MPI"): + mpi_path = Path(opi_var_open_mpi_path) + + # > Case 3: Specified in config file + elif mpi_path_config: + mpi_path = Path(mpi_path_config) + + # > Case 4: MPI is already in the $PATH + # > Then we don't need to do anything. + # > Assumes that $LD_LIBRARY_PATH is also properly configured. + # > Case 5: Not configured/installed at all. + # In this case, ORCA can only be executed with a single core. + # <<< END OF IF-BLOCK + + # > Now determine the bin/ and lib/ folder + if mpi_path: + self._open_mpi_path = mpi_path.expanduser().resolve(strict=True) + + def get_orca_binary(self, binary: OrcaBinary, /) -> Path: + """ + Get absolute path to any of ORCA binaries according to `self._orca_bin_path`. + + Parameters + ---------- + binary : OrcaBinary + Name of ORCA binary to search for. + """ + + assert self._orca_bin_folder is not None + + bin_name = resolve_binary_name(str(binary)) + + # > Full path to ORCA binary + orca_binary = self._orca_bin_folder / bin_name + + if not orca_binary.is_file(): + raise FileNotFoundError(f"The ORCA binary does not exist: {orca_binary}") + else: + return orca_binary diff --git a/src/opi/execution/core.py b/src/opi/execution/core.py index 86abcb30..213d44a8 100644 --- a/src/opi/execution/core.py +++ b/src/opi/execution/core.py @@ -8,241 +8,21 @@ """ import json -import os -import shutil -import subprocess -from contextlib import nullcontext -from io import TextIOWrapper from pathlib import Path -from subprocess import CompletedProcess -from typing import Any, Callable, Concatenate, ParamSpec, Sequence, TypeVar, cast +from typing import Sequence -from opi import ORCA_MINIMAL_VERSION +from opi.execution.base import BaseRunner from opi.lib.orca_binary import OrcaBinary -from opi.utils.config import get_config -from opi.utils.misc import add_to_env, check_minimal_version, delete_empty_file, resolve_binary_name -from opi.utils.orca_version import OrcaVersion -RunnerType = TypeVar("RunnerType", bound="Runner") -P = ParamSpec("P") -R = TypeVar("R") - -def _orca_environment( - runner: Callable[Concatenate[RunnerType, P], R], / -) -> Callable[Concatenate[RunnerType, P], R]: +class Runner(BaseRunner): """ - Wrapper that temporarily modifies environment, to ensure that the correct ORCA and OpenMPI installation are found. - Resets environment upon exiting. - - Parameters - ---------- - runner : Callable[Concatenate[RunnerType, P], R] - Function that is to be wrapped. + This class should be to used to execute the following ORCA binaries: + - `orca` + - `orca_plot` + - `orca_2json` """ - def wrapper(self: RunnerType, /, *args: Any, **kwargs: Any) -> R: - org_env = os.environ.copy() - try: - # ////////////////////////////// - # > SETUP ENVIRONMENT - # ////////////////////////////// - - # > Updating necessary environmental variables. - add_to_env("PATH", str(self._orca_bin_folder), prepend=True) - add_to_env("LD_LIBRARY_PATH", str(self._orca_lib_folder), prepend=True) - - # > Setting Open MPI path - if self._open_mpi_path: - add_to_env("PATH", str(self._open_mpi_path / "bin"), prepend=True) - add_to_env("LD_LIBRARY_PATH", str(self._open_mpi_path / "lib"), prepend=True) - - # ////////////////////////////// - # > Call Runner - # ////////////////////////////// - return runner(self, *args, **kwargs) - finally: - os.environ = org_env # type: ignore - - # << END OF INNER FUNC - - return wrapper - - -class Runner: - """ - Main class that facilities execution of ORCA binaries. - Makes sure that correct ORCA binary and MPI libraries are used. - This class should be to used to execute any ORCA binary. - """ - - def __init__(self, working_dir: Path | str | os.PathLike[str] | None = None) -> None: - """ - Parameters - ---------- - working_dir : Path | str | os.PathLike[str] | None, default = None - Optional working directory for execution. - """ - # > Working dir. Must exist! - self._working_dir: Path = Path.cwd() - self.working_dir: Path = cast(Path, working_dir) - - # ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - # > ORCA & Open MPI Installation - # ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - # > Either the main ORCA folder contains a 'bin/' and a 'lib/' folder or all files are just contained in the main folder. - self._orca_bin_folder: Path | None = None - self._orca_lib_folder: Path | None = None - # > Open MPI location - # > The variable stores the path to base folder of Open MPI. - # >> May stay `None` if Open MPI is already present in $PATH. - self._open_mpi_path: Path | None = None - - self.set_orca_path() - self.set_open_mpi_path() - - @property - def working_dir(self) -> Path: - return self._working_dir - - @working_dir.setter - def working_dir(self, value: Path | str | os.PathLike[str] | None) -> None: - """ - Parameters - ---------- - value : Path | str | os.PathLike[str] | None - """ - - if value is None: - # > Unsetting working_dir by setting it to CWD. - # > Thereby, working_dir is never "unset". - self._working_dir = Path.cwd() - else: - value = Path(value) - if not value.is_dir(): - raise ValueError( - f"{self.__class__.__name__}.working_dir: {value} does is not a directory!" - ) - # > Completely resolving path - self._working_dir = value.expanduser().resolve() - - @_orca_environment - def run( - self, - binary: OrcaBinary, - args: Sequence[str] = (), - /, - *, - stdin_str: str | None = None, - stdout: Path | None = None, - stderr: Path | None = None, - silent: bool = True, - capture: bool = False, - cwd: Path | None = None, - timeout: int = -1, - ) -> subprocess.CompletedProcess[str] | None: - """ - Function that executes ORCA binary. - - Parameters - ---------- - binary : OrcaBinary - Name of ORCA binary to be executed. Path is automatically resolved based on configuration. - args : Sequence[str], default: () - Command line arguments to pass to ORCA binary. - stdin_str: str | None = None - String to be passed to stdin. - stdout : Path | None, default: None - Dump STDOUT to a file. - stderr : Path | None, default: None - Dump STDERR to a file. - silent : bool, default: True - Redirect STDOUT and STDERR to null-device. - Is overruled respectively by `stdout` and `stderr` and `capture`. - capture : bool, default: False - Capture STDOUT and STDERR and return with `CompletedProcess[str]` object. - Is overruled respectively by `stdout` and `stderr`. - cwd : Path | None, default: None - Set working directory for execution. Overrules `self.working_dir`. - timeout : int, default: -1 - Optional timeout in seconds to wait for process to complete. - - Returns - ------- - subprocess.subprocess.CompletedProcess[str] | None: - Completed ORCA process. - - Raises - ------ - FileNotFound: - Error if path to ORCA binary cannot be resolved. - subprocess.TimeoutExpired: - If `timeout>-1` and the process times out. - """ - - # ------------------------------------------------------------ - def determine_dump(source: Path | None = None) -> TextIOWrapper: - """ - Determine where to dump `source` to. - - Parameters - ---------- - source : Path | None, default: None - """ - - if source: - return source.open("w") - elif capture: - return cast(TextIOWrapper, nullcontext(subprocess.PIPE)) - elif silent: - return Path(os.devnull).open("w") - else: - return cast(TextIOWrapper, nullcontext()) - - # ------------------------------------------------------------ - - if not isinstance(binary, OrcaBinary): - raise ValueError(f"`binary` must be of type OrcaBinary, not: {type(binary)}") - - # > Working dir - if not cwd: - cwd = self.working_dir - - # > Get requested ORCA binary - orca_bin = self.get_orca_binary(binary) - - # > STDOUT and STDERR capturing/dumping - outfile = determine_dump(stdout) - errfile = determine_dump(stderr) - - # > Assembling full call - cmd = [str(orca_bin)] - if args: - cmd += list(args) - - # Run the binary - proc = None - try: - with outfile as f_out, errfile as f_err: - proc = subprocess.run( - cmd, - input=stdin_str, - stdout=f_out, - stderr=f_err, - cwd=cwd, - text=True, - timeout=timeout if timeout > 0 else None, - ) - return proc - except subprocess.TimeoutExpired: - raise - finally: - # > Delete empty STDOUT and STDERR dumps - if stdout: - delete_empty_file(stdout) - if stderr: - delete_empty_file(stderr) - def run_orca( self, inpfile: Path, /, *extra_args: str, silent: bool = True, timeout: int = -1 ) -> None: @@ -349,253 +129,6 @@ def run_orca_plot( timeout=timeout, ) - def get_version(self) -> OrcaVersion | None: - """ - Get the ORCA version from the main ORCA binary. - - Returns - ------- - OrcaVersion: - Version of the ORCA. - None: - If the version could not be determined. - """ - - try: - # > May raise subprocess.TimeoutExpired - orca_proc = self.run(OrcaBinary.ORCA, ["--version"], capture=True, timeout=5) - - # > Pleasing type checker - assert isinstance(orca_proc, CompletedProcess) - return OrcaVersion.from_output(orca_proc.stdout) - - except (subprocess.TimeoutExpired, ValueError, AssertionError): - return None - - def check_version(self, *, ignore_errors: bool = False) -> bool | None: - """ - Check if the ORCA version of the binary is compatible with the current OPI version. - - Parameters - ---------- - ignore_errors : bool, default: False - False: Raises RuntimeError if version is not compatible or could not be determined. - True: Return True if version is compatible, else return False. Also if the version could not be determined. - - Returns - ------- - bool : - True: If version is compatible. - False: If version is not compatible. - None : - If version could not be determined. - - Raises - ------ - RuntimeError: If `ignore_errors` is False and version is not compatible or could not be determined. - """ - - orca_vers = self.get_version() - - # > Path as string to ORCA binary - try: - orca_bin_str = f"\nORCA binary: {self.get_orca_binary(OrcaBinary.ORCA)}" - except FileNotFoundError: - orca_bin_str = "" - - if orca_vers is None: - if ignore_errors: - return None - else: - raise RuntimeError( - f"Could not determine version of ORCA binary." - f" Make sure ORCA is installed and configured correctly." - f" Minimally required ORCA version: {ORCA_MINIMAL_VERSION}{orca_bin_str}" - ) - - elif not check_minimal_version(orca_vers): - if ignore_errors: - return False - else: - raise RuntimeError( - f"ORCA version {orca_vers} is not supported. Make sure to install at least version:" - f" {ORCA_MINIMAL_VERSION}{orca_bin_str}" - ) - else: - return True - - @staticmethod - def _determine_orca_paths(orca_path: Path, /) -> tuple[Path, Path]: - """ - Determine the actual path to the folders that contains the ORCA binaries as well as the libraries. - We allow several formats, to specify the path to ORCA. - - Parameters - ---------- - orca_path : Path - Can either point to: - 1) the main ORCA binary directly, which must have the name "orca". - 2) the folder which contains the main ORCA binary `orca` either `./orca` or `./bin/orca` - - Returns - ------- - Path: - The path to the folder that contains the ORCA binaries. - Path: - The path to the folder that contains the ORCA libraries. - Both paths can coincide. - """ - - if not isinstance(orca_path, Path): - raise TypeError(f"'orca_path' parameter is not a Path, but: {type(orca_path)}") - - # > Resolving path. This will also check if the target exists - try: - orca_path = orca_path.expanduser().resolve(strict=True) - except FileNotFoundError: - raise FileNotFoundError(f"ORCA path does not exist: {orca_path}") - - # > Case 1 - if orca_path.is_file() and orca_path.name == resolve_binary_name(OrcaBinary.ORCA): - # > Check if the parent dir is 'bin/' - if orca_path.parent.name == "bin": - orca_bin_folder = orca_path.parent - orca_lib_folder = orca_bin_folder.with_name("lib") - else: - orca_bin_folder = orca_path.parent - orca_lib_folder = orca_bin_folder - - # > Case 2 - elif orca_path.is_dir(): - # > Check if the current dir contains a bin or a lib folder. - if (orca_path / "bin").exists(): - orca_bin_folder = orca_path / "bin" - orca_lib_folder = orca_path / "lib" - else: - orca_bin_folder = orca_path - orca_lib_folder = orca_path - - # > NOT FOUND - else: - raise RuntimeError(f"Path to ORCA is invalid: {orca_path}") - - # > Make sure both folders exists - assert orca_bin_folder is not None - assert orca_lib_folder is not None - # > Check that binary folder exists - if not orca_bin_folder.is_dir(): - raise FileNotFoundError( - f"The ORCA binary folder does not exists or is not a folder: {orca_bin_folder}" - ) - # > If the bin and lib folder do not coincide, we also check the lib folder. - if orca_bin_folder != orca_lib_folder and not orca_lib_folder.is_dir(): - raise FileNotFoundError( - f"The ORCA library folder does not exists or is not a folder: {orca_lib_folder}" - ) - - return orca_bin_folder, orca_lib_folder - - def set_orca_path(self, orca_path: Path | None = None, /) -> None: - """ - Determine and set the ORCA installation to be used. - - Parameters - ---------- - orca_path : Path | None, default: None - """ - - # > Fetching OPI config. Needs to fetched first, as it might be empty or not exist. - orca_path_config = None - if config := get_config(): - orca_path_config = config.get("ORCA_PATH") - - # > Case 1: Path given via function parameters - if orca_path is not None: - if not isinstance(orca_path, Path): - raise TypeError(f"'orca_path' parameter is not a Path, but: {type(orca_path)}") - # << END OF IF - # << END OF IF - - # > Case 2: $OPI_PATH - elif opi_var_orca_path := os.environ.get("OPI_ORCA"): - orca_path = Path(opi_var_orca_path) - - # > Case 3: Config file - elif orca_path_config: - orca_path = Path(orca_path_config) - - # > Case 4: $PATH - elif var_orca_path := shutil.which("orca"): - orca_path = Path(var_orca_path) - - # > NOT FOUND - else: - raise RuntimeError("Could not find ORCA.") - - # > Now determine the bin/ and lib/ folder - self._orca_bin_folder, self._orca_lib_folder = self._determine_orca_paths(orca_path) - - def set_open_mpi_path(self, mpi_path: Path | None = None, /) -> None: - """ - Determine and set the Open MPI installation to be used. - - Parameters - ---------- - mpi_path : Path | None, default: None - """ - - # > Needs to fetched ahead of other check, as it might be empty or not exist. - mpi_path_config = None - if config := get_config(): - mpi_path_config = config.get("MPI_PATH") - - # > Case 1: Path given via function parameter - if mpi_path is not None: - if not isinstance(mpi_path, Path): - raise TypeError(f"'mpi_path' parameter is not a Path, but: {type(mpi_path)}") - # << END OF IF - - # > Case 2: $OPI_MPI - elif opi_var_open_mpi_path := os.environ.get("OPI_MPI"): - mpi_path = Path(opi_var_open_mpi_path) - - # > Case 3: Specified in config file - elif mpi_path_config: - mpi_path = Path(mpi_path_config) - - # > Case 4: MPI is already in the $PATH - # > Then we don't need to do anything. - # > Assumes that $LD_LIBRARY_PATH is also properly configured. - # > Case 5: Not configured/installed at all. - # In this case, ORCA can only be executed with a single core. - # <<< END OF IF-BLOCK - - # > Now determine the bin/ and lib/ folder - if mpi_path: - self._open_mpi_path = mpi_path.expanduser().resolve(strict=True) - - def get_orca_binary(self, binary: OrcaBinary, /) -> Path: - """ - Get absolute path to any of ORCA binaries according to `self._orca_bin_path`. - - Parameters - ---------- - binary : OrcaBinary - Name of ORCA binary to search for. - """ - - assert self._orca_bin_folder is not None - - bin_name = resolve_binary_name(str(binary)) - - # > Full path to ORCA binary - orca_binary = self._orca_bin_folder / bin_name - - if not orca_binary.is_file(): - raise FileNotFoundError(f"The ORCA binary does not exist: {orca_binary}") - else: - return orca_binary - def run_orca_2json(self, args: Sequence[str] = (), /) -> None: """ Execute `orca_2json` with given arguments. diff --git a/src/opi/execution/mm.py b/src/opi/execution/mm.py new file mode 100644 index 00000000..70cb0e7c --- /dev/null +++ b/src/opi/execution/mm.py @@ -0,0 +1,545 @@ +""" +Module that contains `OrcaMmRunner` class which facilitates execution of `orca_mm`. + +Attributes +---------- +OrcaMmCommand: + Helper type for supported `orca_mm` commands. +ForcefieldType: + Helper type for supported forcefield input formats used by `-convff`. +ChargeOption: + Helper type for supported charge calculation options used by `-makeff`. +""" + +from contextlib import contextmanager +from pathlib import Path +import tempfile +from typing import Iterable, Iterator, Literal, Sequence + +from opi.execution.base import BaseRunner +from opi.lib.orca_binary import OrcaBinary + + +OrcaMmCommand = Literal[ + "convff", "splitff", "mergeff", "repeatff", "splitpdb", "mergepdb", "makeff", "getHDist" +] +ForcefieldType = Literal["amber", "charmm", "openmm"] + +ChargeOption = Literal[ + "PBE", + "PBEOpt", + "PBEOptH", + "XTB", + "XTBOpt", + "XTBOptH", + "XTBOptPBE", + "noChargeCalc", +] + + +class OrcaMmException(Exception): + """ + Exception raised when an `orca_mm` command reports an error. + """ + + def __init__(self, command: OrcaMmCommand, arguments: Sequence[str], error: str): + """ + Parameters + ---------- + command : OrcaMmCommand + `orca_mm` subcommand that was executed. + arguments : Sequence[str] + Command-line arguments passed to `orca_mm`. + error : str + Error output captured from `orca_mm` STDERR. + """ + fmt_arguments = " ".join(arguments) + message = ( + f"Failed running command: 'orca_mm -{command} {fmt_arguments}'.\n" + f"orca_mm failed with the following error:\n{error.strip()}" + ) + super().__init__(message) + + +def _add_infix_to_path(path: Path, infix: str, suffix: str) -> Path: + """ + Adds `infix` to the `path` whilst preserving the given `suffix`. + + Parameters + ---------- + path : Path + Source file path. + infix : str + String inserted before `suffix` in the filename. + suffix : str + Suffix of file path to preserve. + + Returns + ------- + Path + Updated path in the same directory. + + Examples + -------- + >>> path = Path.cwd() / "system.ORCAFF.prms" + >>> suffix = ".ORCAFF.prms + >>> _replace_infix(path, "_merged", suffix) + Path("system_merged.ORCAFF.prms") + """ + + return path.parent / f"{path.name.removesuffix(suffix)}{infix}{suffix}" + + +class OrcaMmRunner(BaseRunner): + _orca_ff_suffix = ".ORCAFF.prms" + + @staticmethod + @contextmanager + def _expect_output_files(*expected_outputs: Path) -> Iterator[None]: + """ + Context manager that checks expected output files after command execution. + + Parameters + ---------- + *expected_outputs : Path + Output files that must exist after the wrapped command has finished successfully. + """ + try: + yield + except Exception: + raise + + missing_outputs = [path for path in expected_outputs if not path.exists()] + if len(missing_outputs) == 0: + return + + formatted_outputs = ", ".join(f"'{path}'" for path in missing_outputs) + raise FileNotFoundError(f"Expected output file(s) do not exist: {formatted_outputs}.") + + def run_orca_mm( + self, + command: OrcaMmCommand, + arguments: Sequence[str], + *, + raise_on_error: bool = True, + silent: bool = True, + timeout: int = -1, + ): + """ + Execute `orca_mm` with the provided subcommand and arguments. + + Parameters + ---------- + command : OrcaMmCommand + `orca_mm` subcommand to execute. + arguments : Sequence[str] + Command-line arguments passed to the subcommand. + raise_on_error : bool, default: True + Raise `OrcaMmException` if `orca_mm` writes anything to STDERR. + silent : bool, default: True + Capture and discard STDOUT and STDERR. + timeout : int, default: -1 + Optional timeout in seconds to wait for process to complete. + + Raises + ------ + OrcaMmException + If `raise_on_error` is set and `orca_mm` reports an error. + """ + with tempfile.TemporaryDirectory() as tmp_dir: + path = Path(tmp_dir) / "stderr.txt" + self.run( + OrcaBinary.ORCA_MM, + [f"-{command}"] + list(arguments), + stderr=path, + silent=silent, + timeout=timeout, + ) + if raise_on_error and path.exists() and (error := path.read_text()): + raise OrcaMmException(command, arguments, error) + + def run_convff( + self, + ffinput: ForcefieldType, + ff_files: Iterable[Path], + *, + force: bool = False, + raise_on_error: bool = True, + silent: bool = True, + timeout: int = -1, + ) -> Path: + """ + Executes the `orca_mm` binary with the `-convff` flag and passes in the forcefield type + and forcefield files as arguments to the binary. + + Parameters + ---------- + ffinput : ForcefieldType + Input forcefield format (`amber`, `charmm`, or `openmm`). + ff_files : Iterable[Path] + Input forcefield file(s) to convert. + force : bool, default: False + Overwrite existing output file if present. + raise_on_error : bool, default: True + Raise `OrcaMmException` if `orca_mm` reports an error. + silent : bool, default: True + Capture and discard STDOUT and STDERR. + timeout : int, default: -1 + Optional timeout in seconds to wait for process to complete. + + Returns + ------- + Path + Path to generated ORCA forcefield file (`*.ORCAFF.prms`). + """ + ff_files = list(ff_files) + + if len(ff_files) == 0: + raise ValueError("Must supply at least 1 forcefield file.") + + expected_output = ff_files[0].with_suffix(self._orca_ff_suffix) + if expected_output.is_file() and not force: + return expected_output + + expected_output.unlink(missing_ok=True) + + arguments = [f"-{ffinput}"] + [str(f) for f in ff_files] + with self._expect_output_files(expected_output): + self.run_orca_mm( + "convff", arguments, raise_on_error=raise_on_error, silent=silent, timeout=timeout + ) + + return expected_output + + def run_splitff( + self, + orcaff_file: Path, + *atoms: int, + raise_on_error: bool = True, + silent: bool = True, + timeout: int = -1, + ) -> list[Path]: + """ + Execute `orca_mm -splitff` and split an ORCA forcefield file at selected atom indices. + + Parameters + ---------- + orcaff_file : Path + Path to ORCA forcefield file (`*.ORCAFF.prms`) that will be split. + *atoms : int + 1-based atom indices used as split points. + raise_on_error : bool, default: True + Raise `OrcaMmException` if `orca_mm` reports an error. + silent : bool, default: True + Capture and discard STDOUT and STDERR. + timeout : int, default: -1 + Optional timeout in seconds to wait for process to complete. + + Returns + ------- + list[Path] + Paths to generated split forcefield files. + """ + sorted_atoms = list(sorted(atoms)) + + if len(sorted_atoms) == 0: + raise ValueError("Must supply at least 1 atom.") + + if any(atom < 1 for atom in sorted_atoms): + raise ValueError("All atoms must be positive integers.") + + expected_outputs = [ + _add_infix_to_path(orcaff_file, f"_split{split + 1}", self._orca_ff_suffix) + for split in range(len(sorted_atoms) + 1) + ] + + arguments = [f"{orcaff_file}"] + [str(atom) for atom in sorted_atoms] + with self._expect_output_files(*expected_outputs): + self.run_orca_mm( + "splitff", arguments, raise_on_error=raise_on_error, silent=silent, timeout=timeout + ) + + return expected_outputs + + def run_mergeff( + self, + *orcaff_files: Path, + raise_on_error: bool = True, + silent: bool = True, + timeout: int = -1, + ) -> Path: + """ + Execute `orca_mm -mergeff` to merge multiple ORCA forcefield files. + + Parameters + ---------- + *orcaff_files : Path + ORCA forcefield files (`*.ORCAFF.prms`) to merge. + raise_on_error : bool, default: True + Raise `OrcaMmException` if `orca_mm` reports an error. + silent : bool, default: True + Capture and discard STDOUT and STDERR. + timeout : int, default: -1 + Optional timeout in seconds to wait for process to complete. + + Returns + ------- + Path + Path to merged ORCA forcefield file. + """ + if len(orcaff_files) < 2: + raise ValueError("Must provide at least 2 orca ff files to merge") + + expected_output = _add_infix_to_path(orcaff_files[0], "_merged", self._orca_ff_suffix) + + arguments = [str(f) for f in orcaff_files] + with self._expect_output_files(expected_output): + self.run_orca_mm( + "mergeff", arguments, raise_on_error=raise_on_error, silent=silent, timeout=timeout + ) + + return expected_output + + def run_repeatff( + self, + orcaff_file: Path, + repeat: int, + *, + raise_on_error: bool = True, + silent: bool = True, + timeout: int = -1, + ) -> Path: + """ + Execute `orca_mm -repeatff` to repeat a forcefield topology a fixed number of times. + + Parameters + ---------- + orcaff_file : Path + ORCA forcefield file (`*.ORCAFF.prms`) to repeat. + repeat : int + Number of repetitions. Must be a positive integer. + raise_on_error : bool, default: True + Raise `OrcaMmException` if `orca_mm` reports an error. + silent : bool, default: True + Capture and discard STDOUT and STDERR. + timeout : int, default: -1 + Optional timeout in seconds to wait for process to complete. + + Returns + ------- + Path + Path to repeated ORCA forcefield file. + """ + if repeat < 1: + raise ValueError("'repeat' must be a positive integer") + + expected_output = _add_infix_to_path(orcaff_file, f"_repeat{repeat}", self._orca_ff_suffix) + + arguments = [f"{orcaff_file}", str(repeat)] + with self._expect_output_files(expected_output): + self.run_orca_mm( + "repeatff", + arguments, + raise_on_error=raise_on_error, + silent=silent, + timeout=timeout, + ) + + return expected_output + + def run_splitpdb( + self, + pdb_file: Path, + *atoms: int, + raise_on_error: bool = True, + silent: bool = True, + timeout: int = -1, + ) -> list[Path]: + """ + Execute `orca_mm -splitpdb` and split a PDB structure at selected atom indices. + + Parameters + ---------- + pdb_file : Path + Path to PDB file that will be split. + *atoms : int + 1-based atom indices used as split points. + raise_on_error : bool, default: True + Raise `OrcaMmException` if `orca_mm` reports an error. + silent : bool, default: True + Capture and discard STDOUT and STDERR. + timeout : int, default: -1 + Optional timeout in seconds to wait for process to complete. + + Returns + ------- + list[Path] + Paths to generated split PDB files. + """ + sorted_atoms = list(sorted(atoms)) + + if len(sorted_atoms) == 0: + raise ValueError("Must supply at least 1 atom.") + + if any(atom < 1 for atom in sorted_atoms): + raise ValueError("All atoms must be positive integers.") + + expected_outputs = [ + _add_infix_to_path(pdb_file, f"_split{split + 1}", ".pdb") + for split in range(len(sorted_atoms) + 1) + ] + + arguments = [f"{pdb_file}"] + [str(atom) for atom in sorted_atoms] + with self._expect_output_files(*expected_outputs): + self.run_orca_mm( + "splitpdb", arguments, raise_on_error=raise_on_error, silent=silent, timeout=timeout + ) + + return expected_outputs + + def run_mergepdb( + self, + *pdb_files: Path, + raise_on_error: bool = True, + silent: bool = True, + timeout: int = -1, + ) -> Path: + """ + Execute `orca_mm -mergepdb` to merge multiple PDB files. + + Parameters + ---------- + *pdb_files : Path + PDB files to merge. + raise_on_error : bool, default: True + Raise `OrcaMmException` if `orca_mm` reports an error. + silent : bool, default: True + Capture and discard STDOUT and STDERR. + timeout : int, default: -1 + Optional timeout in seconds to wait for process to complete. + + Returns + ------- + Path + Path to merged PDB file. + """ + if len(pdb_files) < 2: + raise ValueError("Must provide at least 2 orca ff files to merge") + + expected_output = _add_infix_to_path(pdb_files[0], "_merged", ".pdb") + + arguments = [str(f) for f in pdb_files] + with self._expect_output_files(expected_output): + self.run_orca_mm( + "mergepdb", arguments, raise_on_error=raise_on_error, silent=silent, timeout=timeout + ) + + return expected_output + + def run_makeff( + self, + structure_file: Path, + *, + charge: int | None = None, + multiplicity: int | None = None, + nproc: int | None = None, + charge_option: ChargeOption | None = None, + oxidation_states: dict[str, float] | None = None, + raise_on_error: bool = True, + silent: bool = True, + timeout: int = -1, + ) -> Path: + """ + Execute `orca_mm -makeff` to generate an ORCA forcefield file from a structure file. + + Parameters + ---------- + structure_file : Path + Input structure file for forcefield generation. + charge : int | None, default: None + Total molecular charge passed via `-C`. + multiplicity : int | None, default: None + Spin multiplicity passed via `-M`. Must be a positive integer. + nproc : int | None, default: None + Number of processes passed via `-nproc`. + charge_option : ChargeOption | None, default: None + Charge-calculation mode for forcefield generation. + oxidation_states : dict[str, float] | None, default: None + Optional element-to-oxidation-state mapping passed via repeated `-CEL` flags. + raise_on_error : bool, default: True + Raise `OrcaMmException` if `orca_mm` reports an error. + silent : bool, default: True + Capture and discard STDOUT and STDERR. + timeout : int, default: -1 + Optional timeout in seconds to wait for process to complete. + + Returns + ------- + Path + Path to generated ORCA forcefield file (`*.ORCAFF.prms`). + """ + expected_output = structure_file.with_suffix(self._orca_ff_suffix) + + arguments = [str(structure_file)] + + if charge is not None: + arguments.extend(("-C", str(charge))) + + if multiplicity is not None: + if multiplicity < 1: + raise ValueError("Multiplicity must be a positive integer") + arguments.extend(("-M", str(multiplicity))) + + if nproc is not None: + arguments.extend(("-nproc", str(nproc))) + + if charge_option is not None: + arguments.append(f"-{charge_option}") + + if oxidation_states is not None: + for element, oxidation_state in oxidation_states.items(): + arguments.extend(("-CEL", str(element), f"{float(oxidation_state):.1f}")) + + with self._expect_output_files(expected_output): + self.run_orca_mm( + "makeff", arguments, raise_on_error=raise_on_error, silent=silent, timeout=timeout + ) + + return expected_output + + def run_get_h_dist( + self, + structure_file: Path, + *, + raise_on_error: bool = True, + silent: bool = True, + timeout: int = -1, + ) -> Path: + """ + Execute `orca_mm -getHDist` to generate hydrogen-distance parameters from a structure. + + Parameters + ---------- + structure_file : Path + Input structure file for hydrogen-distance analysis. + raise_on_error : bool, default: True + Raise `OrcaMmException` if `orca_mm` reports an error. + silent : bool, default: True + Capture and discard STDOUT and STDERR. + timeout : int, default: -1 + Optional timeout in seconds to wait for process to complete. + + Returns + ------- + Path + Path to generated `*.H_DIST.prms` file. + """ + expected_output = structure_file.with_suffix(".H_DIST.prms") + + arguments = [str(structure_file)] + with self._expect_output_files(expected_output): + self.run_orca_mm( + "getHDist", arguments, raise_on_error=raise_on_error, silent=silent, timeout=timeout + ) + + return expected_output diff --git a/src/opi/lib/orca_binary.py b/src/opi/lib/orca_binary.py index 14253f22..c8033458 100644 --- a/src/opi/lib/orca_binary.py +++ b/src/opi/lib/orca_binary.py @@ -18,3 +18,4 @@ class OrcaBinary(StrEnum): ORCA_PLOT = "orca_plot" ORCA_PLTVIB = "orca_pltvib" ORCA_VIB = "orca_vib" + ORCA_MM = "orca_mm" diff --git a/tests/unit/test_execution_mm.py b/tests/unit/test_execution_mm.py new file mode 100644 index 00000000..ec21b105 --- /dev/null +++ b/tests/unit/test_execution_mm.py @@ -0,0 +1,50 @@ +from pathlib import Path + +import pytest + +from opi.execution.mm import OrcaMmException, OrcaMmRunner + + +def _set_fake_orca_path(self, orca_path: Path | None = None) -> None: + self._orca_bin_folder = Path("/tmp") + self._orca_lib_folder = Path("/tmp") + + +def _set_fake_open_mpi_path(self, mpi_path: Path | None = None) -> None: + self._open_mpi_path = None + + +@pytest.mark.unit +def test_run_orca_mm_handles_deleted_empty_stderr(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr(OrcaMmRunner, "set_orca_path", _set_fake_orca_path) + monkeypatch.setattr(OrcaMmRunner, "set_open_mpi_path", _set_fake_open_mpi_path) + + runner = OrcaMmRunner() + + def fake_run(binary, args, /, *, stderr=None, silent=True, timeout=-1): + assert stderr is not None + stderr.write_text("") + stderr.unlink() + return None + + monkeypatch.setattr(runner, "run", fake_run) + + runner.run_orca_mm("convff", ["-amber", "test.prm"]) + + +@pytest.mark.unit +def test_run_orca_mm_raises_when_stderr_contains_output(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr(OrcaMmRunner, "set_orca_path", _set_fake_orca_path) + monkeypatch.setattr(OrcaMmRunner, "set_open_mpi_path", _set_fake_open_mpi_path) + + runner = OrcaMmRunner() + + def fake_run(binary, args, /, *, stderr=None, silent=True, timeout=-1): + assert stderr is not None + stderr.write_text("orca_mm failed") + return None + + monkeypatch.setattr(runner, "run", fake_run) + + with pytest.raises(OrcaMmException, match="orca_mm failed"): + runner.run_orca_mm("convff", ["-amber", "test.prm"])