From 302ea768f8835c41e0c490b4ea79951031714a0f Mon Sep 17 00:00:00 2001 From: Niolon Date: Mon, 22 Sep 2025 12:18:24 +0100 Subject: [PATCH 1/2] Adapt the function to be able to be used with the stock olex2 distribution --- pyproject.toml | 1 - qcrboxtools/robots/basesocket.py | 12 +- qcrboxtools/robots/olex2.py | 542 ++++++++++++++++++++++--------- tests/robots/olex/test_olex2.py | 503 +++++++++++++++++++--------- 4 files changed, 745 insertions(+), 313 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 42c7762..4f9fb1e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,6 @@ indent-width = 4 line-length = 120 [tool.ruff.lint] -ignore-init-module-imports = true select = [ # pycodestyle "E", diff --git a/qcrboxtools/robots/basesocket.py b/qcrboxtools/robots/basesocket.py index 141cef8..82b3c66 100644 --- a/qcrboxtools/robots/basesocket.py +++ b/qcrboxtools/robots/basesocket.py @@ -46,7 +46,11 @@ def _send_input(self, input_str: str) -> str: """ input_str += "\n" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.connect((self.server, self.port)) - s.sendall(input_str.encode("UTF-8")) - data = s.recv(1024) - return data.decode() + try: + s.connect((self.server, self.port)) + s.sendall(input_str.encode("UTF-8")) + data = s.recv(1024) + except (ConnectionRefusedError, OSError) as ex: + raise ConnectionError(f"Could not connect to Socket server, {self.server}:{self.port}") from ex + + return data.decode("UTF-8").rstrip("\n") diff --git a/qcrboxtools/robots/olex2.py b/qcrboxtools/robots/olex2.py index f6daab5..46b3a7e 100644 --- a/qcrboxtools/robots/olex2.py +++ b/qcrboxtools/robots/olex2.py @@ -2,237 +2,463 @@ # SPDX-License-Identifier: MPL-2.0 """ -This module provides functionality for interacting with the Olex2 refinement program -in server mode via sockets. +Socket-based interface for Olex2 crystallographic refinement program. + +This module provides the `Olex2Socket` class for interacting with the Olex2 +crystallographic refinement program running in server mode. It enables automated +structure refinement, including support for aspherical atom form factors via +NoSpherA2 integration. + +The module extends the base socket functionality to provide Olex2-specific +command generation, job management, and refinement workflows. + +Classes +------- +Olex2Socket + Socket client for automated Olex2 refinement operations. + +Examples +-------- +>>> from qcrboxtools.robots.olex2 import Olex2Socket +>>> from pathlib import Path +>>> +>>> # Connect to Olex2 server +>>> olex = Olex2Socket("localhost", 8899) +>>> +>>> # Run refinement +>>> cif_file = Path("structure.cif") +>>> result = olex.run_full_refinement(cif_file, n_cycles=20) """ import os -import pathlib import shutil import time import warnings from itertools import count -from typing import Any +from pathlib import Path from .basesocket import SocketRobot class Olex2Socket(SocketRobot): """ - A specialized socket client for Olex2, which provides functionalities like - loading and refining a structure, as well as sending general commands to the - Olex2 server. - - Attributes: - - structure_path (pathlib.Path): The path to the structure file. + Socket client for automated Olex2 crystallographic refinement operations. + + This class provides a high-level interface for interacting with the Olex2 + crystallographic refinement program running in server mode. It supports + structure loading, refinement execution, and integration with aspherical + atom form factors via NoSpherA2. + + The class manages job queuing, command generation, and result retrieval + to enable automated refinement workflows. + + Parameters + ---------- + olex_server : str, optional + The hostname or IP address of the Olex2 server. Defaults to "localhost" + or the value of the OLEX2SERVER environment variable if set. + port : int, optional + The port number on which the Olex2 server is listening. Defaults to 8899 + or the value of the OLEX2PORT environment variable if set. + + Attributes + ---------- + _task_id_counter : itertools.count + Counter for generating unique task identifiers. + + Methods + ------- + run_full_refinement(cif_path, tsc_path=None, n_cycles=20, refine_starts=5) + Execute a complete refinement workflow. + send_command(cif_path, tsc_path, input_str) + Send arbitrary commands to the Olex2 server. + check_connection() + Verify the connection to the Olex2 server. + wait_for_completion(timeout_counter, input_str) + Wait for a submitted job to complete. + + Examples + -------- + >>> olex = Olex2Socket("localhost", 8899) + >>> result = olex.run_full_refinement(Path("structure.cif")) + >>> print("Refinement completed") """ - _structure_path = None - _tsc_path = None - _task_id_counter = count() - - def __init__(self, olex_server: str = "localhost", port: int = 8899, structure_path: str = None): + def __init__( + self, + olex_server: str = "localhost", + port: int = 8899, + ): """ - Initializes the Olex2Socket with server details and an optional structure path. - - Args: - - olex_server (str): The Olex2 server's address. Defaults to the - environment variable $OLEX2SERVER or if that is unavailable: 'localhost'. - - port (int): The port number on which the Olex2 server is listening. Defaults to - the environment variable $OLEX2PORT or if that is unavailable: 8899. - - structure_path (str): The path to the structure file. If provided, will be set - for the instance, otherwise needs to be set later for refinement. + Initialize the Olex2Socket client. + + Sets up connection parameters and initializes the task counter for job + management. Environment variables OLEX2SERVER and OLEX2PORT are used + if available and default parameters are not explicitly provided. + + Parameters + ---------- + olex_server : str, optional + The hostname or IP address of the Olex2 server. Defaults to "localhost" + or the value of the OLEX2SERVER environment variable if set. + port : int, optional + The port number on which the Olex2 server is listening. Defaults to 8899 + or the value of the OLEX2PORT environment variable if set. + + Notes + ----- + The connection is not established during initialization. Use the inherited + socket methods to connect when needed. """ if olex_server == "localhost" and "OLEX2SERVER" in os.environ: olex_server = os.environ["OLEX2SERVER"] if port == 8899 and "OLEX2PORT" in os.environ: - port = os.environ["OLEX2PORT"] + port = int(os.environ["OLEX2PORT"]) super().__init__(olex_server, port) - if structure_path is not None: - self.structure_path = structure_path - - @property - def structure_path(self): - """Returns the path of the structure file.""" - return self._structure_path + self._task_id_counter = count() - @structure_path.setter - def structure_path(self, path: str): + def move_refined_files(self, cif_path: Path): """ - Sets the path of the structure file loads the structure with the path into Olex2. + Move existing .ins and .hkl files to prevent conflicts during refinement. - Args: - - path (str): The path to the structure file. - """ - path = pathlib.Path(path) - self._structure_path = path - - working_dir = path.absolute().parents[0] + Before starting refinement, this method backs up any existing .ins and .hkl + files by copying them with a "_moved" suffix and removing the originals. + This prevents conflicts when Olex2 creates new files during refinement. - if path.absolute().with_suffix(".ins").exists(): + Parameters + ---------- + cif_path : Path + The path to the CIF file. The .ins and .hkl files are expected to + have the same base name but with their respective extensions. + """ + if cif_path.absolute().with_suffix(".ins").exists(): shutil.copy( - path.absolute().with_suffix(".ins"), - str(path.absolute().with_suffix("")) + "_moved.ins", + cif_path.absolute().with_suffix(".ins"), + str(cif_path.absolute().with_suffix("")) + "_moved.ins", ) - path.absolute().with_suffix(".ins").unlink() + cif_path.absolute().with_suffix(".ins").unlink() - if path.absolute().with_suffix(".hkl").exists(): + if cif_path.absolute().with_suffix(".hkl").exists(): shutil.copy( - path.absolute().with_suffix(".hkl"), - str(path.absolute().with_suffix("")) + "_moved.hkl", + cif_path.absolute().with_suffix(".hkl"), + str(cif_path.absolute().with_suffix("")) + "_moved.hkl", ) - path.absolute().with_suffix(".hkl").unlink() + cif_path.absolute().with_suffix(".hkl").unlink() - log_indexes = [int(filename.name[5:-4]) for filename in working_dir.glob("task_*.log")] - - if len(log_indexes) > 0: - self._task_id_counter = count(max(log_indexes) + 1) - else: - self._task_id_counter = count() - - startup_commands = [f"user {working_dir}", f"reap {path.absolute()}"] + def create_structure_load_cmds(self, cif_path: Path) -> list[str]: + """ + Generate Olex2 commands for loading a crystallographic structure. - cmd_list = "\n".join(startup_commands) - cmd = f"run:startup\n{cmd_list}" - self._send_input(cmd) + Creates a sequence of Olex2 commands to load a CIF file, set up the + working directory, and prepare associated files (.ins and .hkl) for + refinement operations. - self.wait_for_completion(2000, "startup", cmd) + Parameters + ---------- + cif_path : Path + The path to the CIF file to be loaded. - load_cmds = [ - f"file {path.with_suffix('.ins').name}", - f"export {path.with_suffix('.hkl').name}", - f"reap {path.with_suffix('.ins').name}", + Returns + ------- + list[str] + A list of Olex2 commands for structure loading and setup. + """ + return [ + f"if strcmp('{cif_path}','none') then none else spy.saveHistory()", + "spy.SaveStructureParams()", + f"if strcmp('{cif_path}','none') then none else 'spy.SaveCifInfo()'", + "if IsFileLoaded() then clear", + f"user {str(cif_path.parent)}", + f"@reap {cif_path}", + "spy.OnStructureLoaded(filename())", + f"file {cif_path.with_suffix('.ins').name}", + f"@export {cif_path.with_suffix('.hkl').name}", + f"@reap {cif_path.with_suffix('.ins')}", ] - try: - self.send_command("\n".join(load_cmds)) - except RuntimeError as exc: - warnings.warn("There has been an error during loading: " + str(exc)) - - # ensures that the tsc file is copied when not in same folder - if self.tsc_path is not None: - self.tsc_path = self.tsc_path - - @property - def tsc_path(self): - """Returns the path of the currently selected tsc(b) file.""" - return self._tsc_path - @tsc_path.setter - def tsc_path(self, path): + def create_startup_cmds(self, cif_path: Path, job_id: int) -> list[str]: """ - Sets the path of the tsc(b), if not None, activate aspherical refinement. - - Args: - - path (str): The path to the tsc(b) file. + Generate Olex2 startup commands for job initialization. + + Creates commands to set up the Olex2 environment for a specific job, + including logging, working directory, job identification, and client + mode configuration. + + Parameters + ---------- + cif_path : Path + The path to the CIF file being processed. + job_id : int + Unique identifier for the current job. + + Returns + ------- + list[str] + A list of Olex2 startup commands. """ - if self.structure_path is None: - raise ValueError( - "No structure loaded to refine. The structure_path attribute needs to be set before setting a tsc_path." - ) + return [ + f"run:task_{job_id}", + f"xlog:{str(cif_path.parent / f'task_{job_id}.log')}", + f"user {str(cif_path.parent)}", + "SetVar server.job_id " + f"task_{job_id}", + "SetVar olex2.remote_mode true", + "spy.LoadParams 'user,olex2'", + "spy.SetParam user.refinement.client_mode False", + "SetOlex2RefinementListener(True)", + ] - if path is not None: - path = pathlib.Path(path) - if not path.exists(): - raise FileNotFoundError(f"File {path} does not exist.") - if path.absolute().parent != self.structure_path.absolute().parent: - shutil.copy(path, self.structure_path.parent / path.name) - cmds = [ - "spy.SetParam('snum.NoSpherA2.use_aspherical', True)", - "spy.SetParam('snum.NoSpherA2.Calculate', False)", - "spy.SetParam('snum.NoSpherA2.source', 'tsc_file')", - f"spy.SetParam('snum.NoSpherA2.file', '{path.name}')", - ] - self.send_command("\n".join(cmds)) - else: - cmds = [ + def create_nosphera2_cmds(self, tsc_path: Path | None) -> list[str]: + """ + Generate NoSpherA2 configuration commands for aspherical refinement. + + Creates Olex2 commands to configure NoSpherA2 settings for either + aspherical or spherical atom form factors based on whether a TSC + file is provided. + + Parameters + ---------- + tsc_path : Path or None + The path to the TSC (or TSCB) file containing aspherical form factors. + If None, aspherical refinement is disabled. + + Returns + ------- + list[str] + A list of Olex2 commands for NoSpherA2 configuration. + """ + if tsc_path is None: + return [ "spy.SetParam('snum.NoSpherA2.use_aspherical', False)", "spy.SetParam('snum.NoSpherA2.Calculate', False)", "spy.SetParam('snum.NoSpherA2.file', None)", ] - self.send_command("\n".join(cmds)) - self._tsc_path = path + return [ + "spy.SetParam('snum.NoSpherA2.use_aspherical', True)", + "spy.SetParam('snum.NoSpherA2.Calculate', False)", + "spy.SetParam('snum.NoSpherA2.source', 'tsc_file')", + f"spy.SetParam('snum.NoSpherA2.file', '{tsc_path.name}')", + ] + + def create_refine_cmds(self, n_cycles: int, refine_starts: int) -> list[str]: + """ + Generate refinement commands for crystallographic optimization. + + Creates a sequence of Olex2 commands to set up the refinement program, + configure instruction file entries, and execute refinement cycles. + + Parameters + ---------- + n_cycles : int + The number of least-squares cycles to perform in each refinement run. + refine_starts : int + The number of times to restart the refinement process to adapt a + weighting scheme iteratively. + + Returns + ------- + list[str] + A list of Olex2 refinement commands. + """ + return [ + "spy.set_refinement_program(olex2.refine, Gauss-Newton)", + "DelIns ACTA", + "AddIns ACTA", + ] + [f"refine {n_cycles}"] * refine_starts + + def reserve_new_job(self, task_id): + """ + Reserve a job slot on the Olex2 server for execution. + + Attempts to reserve a job slot with the given task ID, waiting if the + server is currently busy with other jobs. + + Parameters + ---------- + task_id : str + The unique identifier for the job to be reserved. + + Notes + ----- + Polls the server every second until a slot becomes available. + """ + data = self._send_input(f"reserve:{task_id}") + while data == "busy": + time.sleep(1) + data = self._send_input(f"reserve:{task_id}") + + def run_full_refinement( + self, cif_path: Path, tsc_path: Path | None = None, n_cycles: int = 20, refine_starts: int = 5 + ) -> str: + """ + Execute a complete crystallographic refinement workflow. + + Performs a full refinement process including structure loading, optional + aspherical atom form factor configuration via NoSpherA2, and iterative + least-squares refinement. + + Parameters + ---------- + cif_path : Path + The path to the CIF file containing the crystal structure to be refined. + tsc_path : Path or None, optional + The path to the TSC (or TSCB) file for aspherical refinement using + NoSpherA2. If None, standard spherical atom form factors are used. + Default is None. + n_cycles : int, optional + The number of least-squares refinement cycles to perform in each + refinement iteration. Default is 20. + refine_starts : int, optional + The number of times to restart the refinement process to improve + convergence. Default is 5. + + Returns + ------- + str + The complete log output from the refinement process, containing + detailed information about the refinement progress and results. + + Notes + ----- + Uses helper methods to generate command sequences and delegates to + `send_command` for execution. + """ + cmd_string = "\n".join(self.create_refine_cmds(n_cycles, refine_starts)) + return self.send_command(cif_path, tsc_path, cmd_string) def check_connection(self): """ - Checks the connection status with the Olex2 server. + Verify the connection status with the Olex2 server. + + Sends a status query to the Olex2 server to determine if it is ready + to accept new jobs and commands. - Returns: - - bool: True if the server is ready, False otherwise. + Returns + ------- + bool + True if the server responds with "ready" status, False otherwise. """ answer = self._send_input("status") return answer.strip() == "ready" - def send_command(self, input_str: str) -> str: + def send_command(self, cif_path: Path, tsc_path: Path | None, input_str: str) -> str: + """ + Send arbitrary commands to the Olex2 server for execution. + + This method provides a general interface for sending custom command + strings to Olex2. It handles job setup, command execution, and result + retrieval, making it suitable for both standard and custom workflows. + + Parameters + ---------- + cif_path : Path + The path to the CIF file to be used in the command execution. + tsc_path : Path or None + The path to the TSC (or TSCB) file for aspherical refinement. + If None, aspherical refinement is disabled for this command. + input_str : str + The command string to be executed on the Olex2 server. This can + contain multiple Olex2 commands separated by newlines. + + Returns + ------- + str + The complete log output from the command execution, or "No log file found." + if the log file could not be accessed. + + Notes + ----- + This method: + 1. Backs up existing refinement files + 2. Sets up a new job with unique ID + 3. Configures the Olex2 environment + 4. Loads the structure and sets up NoSpherA2 + 5. Executes the provided commands + 6. Waits for completion and retrieves the log + + The method is used internally by `run_full_refinement` but can also + be called directly for custom command sequences. """ - Sends a command string to the Olex2 server and waits for its completion. - Args: - - input_str (str): The command string to send. + cif_path_abs = cif_path.absolute() + self.move_refined_files(cif_path_abs) - Returns: - - str: The output log of the command process, or None if the log file is not found. - """ + job_id = next(self._task_id_counter) + task_id = f"task_{job_id}" + cmds = ( + self.create_startup_cmds(cif_path_abs, job_id) + + self.create_structure_load_cmds(cif_path_abs) + + self.create_nosphera2_cmds(tsc_path) + + [input_str, "@close"] + ) - task_id = next(self._task_id_counter) - _ = self._send_input(f"run:{task_id}\nlog:task_{task_id}.log\n{input_str}") - timeout_counter = 2000 - self.wait_for_completion(timeout_counter, task_id, input_str) + self.reserve_new_job(task_id) - log_path = self.structure_path.parents[0] / f"task_{task_id}.log" + full_cmd = "\n".join(cmds) + _ = self._send_input(full_cmd + "\n") + timeout_counter = 2000 + self.wait_for_completion(timeout_counter, full_cmd) try: - with open(log_path, "r", encoding="UTF-8") as fo: + with open(cif_path_abs.parent / f"task_{job_id}.log", "r", encoding="UTF-8") as fo: output = fo.read() return output except FileNotFoundError: - return None + return "No log file found." - def wait_for_completion(self, timeout_counter: int, task_id: Any, input_str: str): + def wait_for_completion(self, timeout_counter: int, input_str: str): """ - Waits for a specific task to complete on the Olex2 server. - - This method will repeatedly check the status of a task on the Olex2 server every 100 ms - until it is finished. It provides timeout functionality to prevent infinite waiting - and will also raise an error if the task fails on the server. - - Args: - - timeout_counter (int): The maximum number of times to check the status before timing out. - - task_id (Any): The unique identifier of the task to check. - - input_str (str): The original command sent to the server. This is used in the error - message if the task fails. - - Raises: - - RuntimeError: If the task fails during execution on the Olex2 server. + Wait for a submitted job to complete on the Olex2 server. + + Continuously polls the Olex2 server status until the job completes, + with timeout protection and error handling for failed jobs. + + Parameters + ---------- + timeout_counter : int + The maximum number of status checks before timing out. Each check + occurs approximately every 500ms. + input_str : str + The original command string sent to the server. Used in error + messages if the job fails during execution. + + Raises + ------ + RuntimeError + If the server returns an error or failure status during job execution. + + Warns + ----- + UserWarning + If the timeout limit is reached before job completion. + + Notes + ----- + Returns immediately when server reports "ready" status or raises + an exception if "error" or "failed" appears in the response. """ return_msg = " " - while "finished" not in return_msg: - return_msg = self._send_input(f"status:{task_id}") + while "ready" not in return_msg: + return_msg = self._send_input("status\n") time.sleep(0.5) timeout_counter -= 1 if timeout_counter < 0: warnings.warn("TimeOut limit for job reached. Continuing") break - if "failed" in return_msg: + if "error" in return_msg or "failed" in return_msg: raise RuntimeError(f"The command {input_str} raised an error during running in olex.") - def refine(self, n_cycles=20, refine_starts=5): + def _shutdown_server(self): """ - Refines a loaded structure and writes a cif file with the refined structure. + Shut down the Olex2 server. - Returns: - - str: The output log of the refinement process. - """ - if self.structure_path is None: - raise ValueError( - "No structure loaded to refine. The structure_path attribute needs to be" + " set before refinement." - ) - cmds = [ - "spy.set_refinement_program(olex2.refine, Gauss-Newton)", - "DelIns ACTA", - "AddIns ACTA", - ] + [f"refine {n_cycles}"] * refine_starts - return self.send_command("\n".join(cmds)) + Sends a 'stop' command to the Olex2 server to initiate a clean shutdown. + This is typically used when the server needs to be terminated after + completing all refinement tasks. - def _shutdown_server(self): - """Sends a 'stop' command to shut down the Olex2 server.""" + Notes + ----- + This method sends the shutdown command but does not wait for confirmation. + The server should stop accepting new jobs and terminate after completing + any currently running tasks. + """ self._send_input("stop") diff --git a/tests/robots/olex/test_olex2.py b/tests/robots/olex/test_olex2.py index 1b49830..6324c88 100644 --- a/tests/robots/olex/test_olex2.py +++ b/tests/robots/olex/test_olex2.py @@ -11,7 +11,7 @@ import shutil import warnings from pathlib import Path -from unittest.mock import Mock, PropertyMock, call, patch +from unittest.mock import Mock, call, patch import numpy as np import pytest @@ -54,8 +54,12 @@ def test_olex2_refine_live(tmp_path): work_path.with_suffix(".ins").touch() olex2 = Olex2Socket() - olex2.structure_path = work_path - _ = olex2.refine() + olex2.run_full_refinement( + work_path, + None, + n_cycles=8, + refine_starts=5, + ) target_path = "./tests/robots/olex/cif_files/refine_conv_nonHaniso.cif" cif_target = cif.reader(str(target_path)).model()["epoxide"] @@ -71,16 +75,20 @@ def test_olex2_refine_live(tmp_path): @pytest.mark.program_dependent def test_olex2_refine_tsc_live(tmp_path): - work_path = os.path.join(tmp_path, "work.cif") + work_path = tmp_path / "work.cif" shutil.copy("./tests/robots/olex/cif_files/refine_nonconv_allaniso.cif", work_path) - tsc_path = Path("./tests/robots/olex/cif_files/refine_allaniso.tscb").absolute() + input_tsc_path = Path("./tests/robots/olex/cif_files/refine_allaniso.tscb").absolute() + shutil.copy(input_tsc_path, tmp_path / input_tsc_path.name) + tsc_path = tmp_path / input_tsc_path.name olex2 = Olex2Socket() - olex2.structure_path = work_path - - olex2.tsc_path = tsc_path - _ = olex2.refine() + olex2.run_full_refinement( + work_path, + tsc_path, + n_cycles=8, + refine_starts=5, + ) target_path = "./tests/robots/olex/cif_files/refine_conv_allaniso.cif" cif_target = cif.reader(str(target_path)).model()["epoxide"] @@ -101,12 +109,10 @@ def test_olex2_init(): olex2 = Olex2Socket() assert olex2.server == "localhost" assert olex2.port == 8899 - assert olex2.structure_path is None olex2 = Olex2Socket("test_server", 1234) assert olex2.server == "test_server" assert olex2.port == 1234 - assert olex2.structure_path is None @pytest.fixture(name="olex2_env_vars") @@ -121,18 +127,17 @@ def fixture_olex2_env_vars(): def test_olex2_init_env(olex2_env_vars): olex2 = Olex2Socket() assert olex2.server == "another_server" - assert olex2.port == "12345" - assert olex2.structure_path is None + assert olex2.port == 12345 def test_olex2_wait_for_completion(): - # create a mock answer generator that returns "test_response" 10 times and then "finished" + # create a mock answer generator that returns "test_response" 10 times and then "ready" def mock_send_input_func(): n = 0 while n < 10: n += 1 yield "test_response" - yield "finished" + yield "ready" mock_send_input_iter = mock_send_input_func() @@ -142,15 +147,15 @@ def mock_send_input_func(): with patch("qcrboxtools.robots.olex2.Olex2Socket._send_input", mock_send_input): with warnings.catch_warnings(): warnings.simplefilter("error") - olex2.wait_for_completion(11, "test", "test_input") + olex2.wait_for_completion(11, "test_input") assert mock_send_input.call_count == 11 - assert mock_send_input.call_args_list == [call("status:test")] * 11 + assert mock_send_input.call_args_list == [call("status\n")] * 11 mock_send_input_iter = mock_send_input_func() mock_send_input.side_effect = lambda *args: next(mock_send_input_iter) with pytest.warns(UserWarning): - olex2.wait_for_completion(1, "test", "test_input") + olex2.wait_for_completion(1, "test_input") def test_wait_for_completion_failed(): @@ -158,140 +163,136 @@ def test_wait_for_completion_failed(): with patch("qcrboxtools.robots.olex2.Olex2Socket._send_input", lambda *args: "failed"): with pytest.raises(RuntimeError): - olex2.wait_for_completion(1, "test", "test_input") + olex2.wait_for_completion(1, "test_input") @patch("qcrboxtools.robots.olex2.Olex2Socket.wait_for_completion") @patch("qcrboxtools.robots.olex2.Olex2Socket._send_input") -def test_olex2_send_command(mock_send_input, mock_wait_for_completion, tmp_path): - # test that the task id is incremented and the command is sent, no log is read as none exists - # mock_structure_path = PropertyMock(return_value= tmp_path / "test.cif") +@patch("qcrboxtools.robots.olex2.Olex2Socket.reserve_new_job") +@patch("qcrboxtools.robots.olex2.Olex2Socket.move_refined_files") +@patch("qcrboxtools.robots.olex2.Olex2Socket.create_startup_cmds") +@patch("qcrboxtools.robots.olex2.Olex2Socket.create_structure_load_cmds") +@patch("qcrboxtools.robots.olex2.Olex2Socket.create_nosphera2_cmds") +def test_olex2_send_command_orchestration( + mock_create_nosphera2_cmds, + mock_create_structure_load_cmds, + mock_create_startup_cmds, + mock_move_refined_files, + mock_reserve_new_job, + mock_send_input, + mock_wait_for_completion, + tmp_path, +): + """Test that send_command orchestrates all steps in correct order.""" + # Setup mock returns + mock_create_startup_cmds.return_value = ["startup command"] + mock_create_structure_load_cmds.return_value = ["load command"] + mock_create_nosphera2_cmds.return_value = ["nosphera command"] + + # Create test files and log + cif_path = tmp_path / "test.cif" + cif_path.touch() + tsc_path = tmp_path / "aspherical.tscb" + tsc_path.touch() + log_path = tmp_path / "task_0.log" + log_path.write_text("test_log_content") + + olex2 = Olex2Socket() + result = olex2.send_command(cif_path, tsc_path, "custom_input") - with patch.object(Olex2Socket, "structure_path", PropertyMock): - olex2 = Olex2Socket(structure_path=tmp_path / "test.cif") - assert olex2._task_id_counter.__repr__() == "count(0)" - assert olex2.send_command("test_input") is None - mock_send_input.assert_called_once_with("run:0\nlog:task_0.log\ntest_input") - assert mock_wait_for_completion.called - assert olex2._task_id_counter.__repr__() == "count(1)" + # Check that task counter increments + assert olex2._task_id_counter.__repr__() == "count(1)" - # test that log is read when exists - (tmp_path / "task_1.log").write_text("test_log_content") - assert olex2.send_command("test_input") == "test_log_content" - assert olex2._task_id_counter.__repr__() == "count(2)" - assert mock_send_input.call_count == 2 + # Check that all orchestration steps are called in order + mock_move_refined_files.assert_called_once_with(cif_path.absolute()) + mock_create_startup_cmds.assert_called_once_with(cif_path.absolute(), 0) + mock_create_structure_load_cmds.assert_called_once_with(cif_path.absolute()) + mock_create_nosphera2_cmds.assert_called_once_with(tsc_path) + mock_reserve_new_job.assert_called_once_with("task_0") + mock_wait_for_completion.assert_called_once() + + # Check that log content is returned + assert result == "test_log_content" @patch("qcrboxtools.robots.olex2.Olex2Socket.wait_for_completion") -@patch("qcrboxtools.robots.olex2.Olex2Socket.send_command") @patch("qcrboxtools.robots.olex2.Olex2Socket._send_input") -def test_olex2_structure_path_setter(mock_send_input, mock_send_command, mock_wait_for_input, tmp_path): - (tmp_path / "test.cif").touch() - (tmp_path / "test.ins").touch() - (tmp_path / "test.hkl").touch() +@patch("qcrboxtools.robots.olex2.Olex2Socket.reserve_new_job") +@patch("qcrboxtools.robots.olex2.Olex2Socket.move_refined_files") +@patch("qcrboxtools.robots.olex2.Olex2Socket.create_startup_cmds") +@patch("qcrboxtools.robots.olex2.Olex2Socket.create_structure_load_cmds") +@patch("qcrboxtools.robots.olex2.Olex2Socket.create_nosphera2_cmds") +def test_olex2_send_command_task_id_increment( + mock_create_nosphera2_cmds, + mock_create_structure_load_cmds, + mock_create_startup_cmds, + mock_move_refined_files, + mock_reserve_new_job, + mock_send_input, + mock_wait_for_completion, + tmp_path, +): + """Test that task IDs increment correctly across multiple calls.""" + # Setup mocks + mock_create_startup_cmds.return_value = ["startup"] + mock_create_structure_load_cmds.return_value = ["load"] + mock_create_nosphera2_cmds.return_value = ["nosphera"] + + cif_path = tmp_path / "test.cif" + cif_path.touch() + + # Create log files for both calls + (tmp_path / "task_0.log").write_text("first call") + (tmp_path / "task_1.log").write_text("second call") olex2 = Olex2Socket() - olex2.structure_path = tmp_path / "test.cif" - assert olex2.structure_path == tmp_path / "test.cif" - assert mock_send_command.call_count == 1 - assert mock_send_input.call_count == 1 - assert mock_wait_for_input.call_count == 1 - # Check the existence of moved files - assert (tmp_path / "test_moved.ins").exists() - assert (tmp_path / "test_moved.hkl").exists() - - (tmp_path / "test2.cif").touch() - olex2.structure_path = tmp_path / "test2.cif" - - assert olex2.structure_path == tmp_path / "test2.cif" - assert mock_send_command.call_count == 2 - assert mock_send_input.call_count == 2 - assert mock_wait_for_input.call_count == 2 - assert not (tmp_path / "test_moved2.ins").exists() - assert not (tmp_path / "test_moved2.hkl").exists() + # First call + result1 = olex2.send_command(cif_path, None, "input1") + assert result1 == "first call" + mock_create_startup_cmds.assert_called_with(cif_path.absolute(), 0) + mock_reserve_new_job.assert_called_with("task_0") - # test that the task id is set to a higher value when log files exist - (tmp_path / "task_1.log").touch() - olex2.structure_path = tmp_path / "test.cif" - assert olex2._task_id_counter.__repr__() == "count(2)" + # Second call + result2 = olex2.send_command(cif_path, None, "input2") + assert result2 == "second call" + # Check that task ID incremented + assert mock_create_startup_cmds.call_args_list[-1][0][1] == 1 # Second call uses job_id=1 + assert mock_reserve_new_job.call_args_list[-1][0][0] == "task_1" # Second call uses task_1 - # test that a tsc_file is copied to the structure_path folder if it is not there - tsc_path = tmp_path / "test.tscb" - tsc_path.touch() - olex2.tsc_path = tsc_path - subfolder = tmp_path / "subfolder" - subfolder.mkdir() - subfolder_cif = subfolder / "test.cif" - subfolder_cif.touch() - olex2.structure_path = subfolder_cif - assert (subfolder / tsc_path.name).exists() - # test that warning is raised, when send_command raises a RuntimeError - olex2.tsc_path = None # Runtime Errors are only catched while loading a structure - mock_send_command.side_effect = RuntimeError - with pytest.warns(UserWarning): - olex2.structure_path = tmp_path / "test.cif" +@patch("qcrboxtools.robots.olex2.Olex2Socket.wait_for_completion") +@patch("qcrboxtools.robots.olex2.Olex2Socket._send_input") +@patch("qcrboxtools.robots.olex2.Olex2Socket.reserve_new_job") +@patch("qcrboxtools.robots.olex2.Olex2Socket.move_refined_files") +@patch("qcrboxtools.robots.olex2.Olex2Socket.create_startup_cmds") +@patch("qcrboxtools.robots.olex2.Olex2Socket.create_structure_load_cmds") +@patch("qcrboxtools.robots.olex2.Olex2Socket.create_nosphera2_cmds") +def test_olex2_send_command_no_log( + mock_create_nosphera2_cmds, + mock_create_structure_load_cmds, + mock_create_startup_cmds, + mock_move_refined_files, + mock_reserve_new_job, + mock_send_input, + mock_wait_for_completion, + tmp_path, +): + """Test send_command when log file doesn't exist.""" + # Setup mocks + mock_create_startup_cmds.return_value = ["startup"] + mock_create_structure_load_cmds.return_value = ["load"] + mock_create_nosphera2_cmds.return_value = ["nosphera"] + + cif_path = tmp_path / "test.cif" + cif_path.touch() + # Note: no log file created + olex2 = Olex2Socket() + result = olex2.send_command(cif_path, None, "test_input") -@patch("qcrboxtools.robots.olex2.Olex2Socket.send_command") -def test_olex2_tsc_path_setter(mock_send_command, tmp_path): - with patch.object(Olex2Socket, "structure_path", PropertyMock): - # test that ValueError is raised when setting tsc_path without setting structure_path - olex2 = Olex2Socket() - olex2.structure_path = None - with pytest.raises(ValueError): - olex2.tsc_path = tmp_path / "test.tscb" - - # test that tsc_path is set correctly in intended case - - cif1_path = tmp_path / "test1.cif" - cif1_path.touch() - - olex2 = Olex2Socket(structure_path=cif1_path) - - tsc_path = tmp_path / "test.tscb" - tsc_path.touch() - - olex2.tsc_path = tsc_path - assert olex2.tsc_path == tsc_path - assert mock_send_command.call_args_list[-1] == call( - "\n".join( - [ - "spy.SetParam('snum.NoSpherA2.use_aspherical', True)", - "spy.SetParam('snum.NoSpherA2.Calculate', False)", - "spy.SetParam('snum.NoSpherA2.source', 'tsc_file')", - f"spy.SetParam('snum.NoSpherA2.file', '{tsc_path.name}')", - ] - ) - ) - - # test that the tsc file is not set if it does not exist - tsc_path.unlink() - with pytest.raises(FileNotFoundError): - olex2.tsc_path = tsc_path - - # test that the tsc file is copied to the structure_path folder if somewhere else - structure_subfolder = tmp_path / "structure" - structure_subfolder.mkdir() - (structure_subfolder / "test.cif").touch() - olex2.structure_path = structure_subfolder / "test.cif" - tsc_path.touch() - olex2.tsc_path = tsc_path - assert (structure_subfolder / tsc_path.name).exists() - - # test that NoSpherA2 commands are send when tsc_path is set to None - olex2.tsc_path = None - assert olex2.tsc_path is None - assert mock_send_command.call_args_list[-1] == call( - "\n".join( - [ - "spy.SetParam('snum.NoSpherA2.use_aspherical', False)", - "spy.SetParam('snum.NoSpherA2.Calculate', False)", - "spy.SetParam('snum.NoSpherA2.file', None)", - ] - ) - ) + # Should return the default message when log file not found + assert result == "No log file found." @patch("qcrboxtools.robots.olex2.Olex2Socket._send_input", new_callable=lambda: Mock(return_value="ready")) @@ -301,25 +302,227 @@ def test_check_connection(mock_send_input): assert mock_send_input.call_args_list == [call("status")] -@patch("qcrboxtools.robots.olex2.Olex2Socket.send_command") -def test_olex2_refine(mock_send_command, tmp_path): - with patch.object(Olex2Socket, "structure_path", PropertyMock): - olex2 = Olex2Socket() +@patch("qcrboxtools.robots.olex2.Olex2Socket._send_input") +def test_shutdown(mock_send_input): + olex2 = Olex2Socket() + olex2._shutdown_server() + assert mock_send_input.call_args_list == [call("stop")] + + +def test_move_refined_files(tmp_path): + """Test that move_refined_files correctly handles existing files.""" + olex2 = Olex2Socket() + + # Create test files + test_cif = tmp_path / "test.cif" + test_ins = tmp_path / "test.ins" + test_hkl = tmp_path / "test.hkl" + + test_cif.touch() + test_ins.write_text("test ins content") + test_hkl.write_text("test hkl content") + + # Run the method + olex2.move_refined_files(test_cif) + + # Check implementation behavior: files should be moved, not copied + assert not test_ins.exists() + assert not test_hkl.exists() + + # Check that moved files exist with expected naming pattern + moved_ins = tmp_path / "test_moved.ins" + moved_hkl = tmp_path / "test_moved.hkl" + + assert moved_ins.exists() + assert moved_hkl.exists() + assert moved_ins.read_text() == "test ins content" + assert moved_hkl.read_text() == "test hkl content" + + +def test_move_refined_files_no_files(tmp_path): + """Test that move_refined_files handles missing files gracefully.""" + olex2 = Olex2Socket() + test_cif = tmp_path / "test.cif" + test_cif.touch() + + # Should not raise an error when files don't exist + try: + olex2.move_refined_files(test_cif) + except Exception as e: + pytest.fail(f"move_refined_files raised an exception when files don't exist: {e}") + + +def test_move_refined_files_partial_files(tmp_path): + """Test move_refined_files when only some files exist.""" + olex2 = Olex2Socket() + + test_cif = tmp_path / "test.cif" + test_ins = tmp_path / "test.ins" + # Note: no .hkl file + + test_cif.touch() + test_ins.write_text("only ins content") + + olex2.move_refined_files(test_cif) + + # Should move the existing file + assert not test_ins.exists() + moved_ins = tmp_path / "test_moved.ins" + assert moved_ins.exists() + assert moved_ins.read_text() == "only ins content" + + # Should not create .hkl files where none existed + moved_hkl = tmp_path / "test_moved.hkl" + assert not moved_hkl.exists() + + +def test_create_structure_load_cmds(): + """Test that create_structure_load_cmds includes essential loading commands.""" + olex2 = Olex2Socket() + cif_path = Path("/test/path/structure.cif") + + cmds = olex2.create_structure_load_cmds(cif_path) + + # Check that essential commands are present + cmd_string = "\n".join(cmds) + assert "user /test/path" in cmd_string # Sets working directory + assert "@reap /test/path/structure.cif" in cmd_string # Loads the CIF + assert "file structure.ins" in cmd_string # Loads INS file + assert "@export structure.hkl" in cmd_string # Exports HKL data + assert "@reap /test/path/structure.ins" in cmd_string # Loads INS file + + +def test_create_startup_cmds(): + """Test that create_startup_cmds includes essential startup commands.""" + olex2 = Olex2Socket() + cif_path = Path("/test/path/structure.cif") + job_id = 42 + + cmds = olex2.create_startup_cmds(cif_path, job_id) + + cmd_string = "\n".join(cmds) + # Check for essential startup elements + assert f"run:task_{job_id}" in cmd_string # Sets task ID + assert f"xlog:/test/path/task_{job_id}.log" in cmd_string # Sets log file + assert "user /test/path" in cmd_string # Sets working directory + assert f"task_{job_id}" in cmd_string # Job ID is referenced + assert "remote_mode true" in cmd_string # Remote mode enabled + + +def test_create_nosphera2_cmds_no_tsc(): + """Test create_nosphera2_cmds disables aspherical refinement when no tsc file.""" + olex2 = Olex2Socket() + + cmds = olex2.create_nosphera2_cmds(None) - # test that ValueError is raised when no structure loaded - with pytest.raises(ValueError): - olex2.structure_path = None - olex2.refine() + cmd_string = "\n".join(cmds) + # Should disable aspherical refinement + assert "use_aspherical', False" in cmd_string + assert "Calculate', False" in cmd_string - (tmp_path / "test.cif").touch() - olex2.structure_path = tmp_path / "test.cif" - olex2.refine(n_cycles=5, refine_starts=4) - cmd_string = "\n".join(["refine 5"] * 4) - assert cmd_string in mock_send_command.call_args[0][0] +def test_create_nosphera2_cmds_with_tsc(): + """Test create_nosphera2_cmds enables aspherical refinement with tsc file.""" + olex2 = Olex2Socket() + tsc_path = Path("/test/path/aspherical.tscb") + + cmds = olex2.create_nosphera2_cmds(tsc_path) + + cmd_string = "\n".join(cmds) + # Should enable aspherical refinement + assert "use_aspherical', True" in cmd_string + assert "Calculate', False" in cmd_string + assert "aspherical.tscb" in cmd_string + + +def test_create_refine_cmds(): + """Test that create_refine_cmds produces correct number of refinement cycles.""" + olex2 = Olex2Socket() + cmds = olex2.create_refine_cmds(n_cycles=15, refine_starts=4) + + # Should have setup commands + refinement commands + refine_commands = [cmd for cmd in cmds if cmd.startswith("refine ")] + setup_commands = [cmd for cmd in cmds if not cmd.startswith("refine ")] + + # Check implementation details + assert len(refine_commands) == 4 # Should have 4 refine starts + assert all("refine 15" == cmd for cmd in refine_commands) # Each should use 15 cycles + assert len(setup_commands) >= 3 # Should have setup commands (refinement program, ACTA handling) + + # Check that essential setup is present + setup_string = "\n".join(setup_commands) + assert "olex2.refine" in setup_string # Uses olex2 refinement + assert "ACTA" in setup_string # Handles ACTA instructions + + +@patch("qcrboxtools.robots.olex2.time.sleep") @patch("qcrboxtools.robots.olex2.Olex2Socket._send_input") -def test_shutdown(mock_send_input): +def test_reserve_new_job_success(mock_send_input, mock_sleep): + """Test reserve_new_job when reservation is successful immediately.""" olex2 = Olex2Socket() - olex2._shutdown_server() - assert mock_send_input.call_args_list == [call("stop")] + mock_send_input.return_value = "reserved" + + olex2.reserve_new_job("test_task") + + mock_send_input.assert_called_once_with("reserve:test_task") + mock_sleep.assert_not_called() + + +@patch("qcrboxtools.robots.olex2.time.sleep") +@patch("qcrboxtools.robots.olex2.Olex2Socket._send_input") +def test_reserve_new_job_busy_then_success(mock_send_input, mock_sleep): + """Test reserve_new_job when server is busy then becomes available.""" + olex2 = Olex2Socket() + + mock_send_input.side_effect = ["busy", "reserved"] + + olex2.reserve_new_job("test_task") + + assert mock_send_input.call_count == 2 + mock_send_input.assert_any_call("reserve:test_task") + mock_sleep.assert_called_once_with(1) + + +@patch("qcrboxtools.robots.olex2.Olex2Socket.send_command") +@patch("qcrboxtools.robots.olex2.Olex2Socket.create_refine_cmds") +def test_run_full_refinement(mock_create_refine_cmds, mock_send_command): + """Test that run_full_refinement orchestrates refinement correctly.""" + olex2 = Olex2Socket() + + # Setup mock returns + mock_create_refine_cmds.return_value = ["setup", "refine 25", "refine 25", "refine 25"] + mock_send_command.return_value = "refinement log output" + + cif_path = Path("/test/structure.cif") + tsc_path = Path("/test/aspherical.tscb") + + result = olex2.run_full_refinement(cif_path, tsc_path, n_cycles=25, refine_starts=3) + + # Check that refinement commands are created with correct parameters + mock_create_refine_cmds.assert_called_once_with(25, 3) + + # Check that send_command is called with paths and joined commands + mock_send_command.assert_called_once_with(cif_path, tsc_path, "setup\nrefine 25\nrefine 25\nrefine 25") + + # Check return value passes through + assert result == "refinement log output" + + +@patch("qcrboxtools.robots.olex2.Olex2Socket.send_command") +@patch("qcrboxtools.robots.olex2.Olex2Socket.create_refine_cmds") +def test_run_full_refinement_defaults(mock_create_refine_cmds, mock_send_command): + """Test run_full_refinement with default parameters.""" + olex2 = Olex2Socket() + + mock_create_refine_cmds.return_value = ["refine 20"] * 5 # Default 5 starts, 20 cycles + mock_send_command.return_value = "default refinement" + + cif_path = Path("/test/structure.cif") + + result = olex2.run_full_refinement(cif_path) # Use defaults + + # Should use default parameters + mock_create_refine_cmds.assert_called_once_with(20, 5) # Default n_cycles=20, refine_starts=5 + mock_send_command.assert_called_once_with(cif_path, None, "refine 20\nrefine 20\nrefine 20\nrefine 20\nrefine 20") + assert result == "default refinement" From b023386d904947a1c0fe3cff704d0f23789ec268 Mon Sep 17 00:00:00 2001 From: Niolon Date: Mon, 22 Sep 2025 13:16:37 +0100 Subject: [PATCH 2/2] Small improvements --- qcrboxtools/robots/basesocket.py | 4 +++- qcrboxtools/robots/olex2.py | 9 ++++++++- tests/robots/olex/test_olex2.py | 5 +---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/qcrboxtools/robots/basesocket.py b/qcrboxtools/robots/basesocket.py index 82b3c66..296fb91 100644 --- a/qcrboxtools/robots/basesocket.py +++ b/qcrboxtools/robots/basesocket.py @@ -51,6 +51,8 @@ def _send_input(self, input_str: str) -> str: s.sendall(input_str.encode("UTF-8")) data = s.recv(1024) except (ConnectionRefusedError, OSError) as ex: - raise ConnectionError(f"Could not connect to Socket server, {self.server}:{self.port}") from ex + raise ConnectionError( + f"Could not connect to {self.__class__.__name__} at {self.server}:{self.port}" + ) from ex return data.decode("UTF-8").rstrip("\n") diff --git a/qcrboxtools/robots/olex2.py b/qcrboxtools/robots/olex2.py index 46b3a7e..c49f013 100644 --- a/qcrboxtools/robots/olex2.py +++ b/qcrboxtools/robots/olex2.py @@ -113,7 +113,14 @@ def __init__( if olex_server == "localhost" and "OLEX2SERVER" in os.environ: olex_server = os.environ["OLEX2SERVER"] if port == 8899 and "OLEX2PORT" in os.environ: - port = int(os.environ["OLEX2PORT"]) + try: + port = int(os.environ["OLEX2PORT"]) + except ValueError: + warnings.warn( + f"OLEX2PORT environment variable ('{os.environ['OLEX2PORT']}')" + + " is not a valid integer. Using default port 8899." + ) + port = 8899 super().__init__(olex_server, port) self._task_id_counter = count() diff --git a/tests/robots/olex/test_olex2.py b/tests/robots/olex/test_olex2.py index 6324c88..f41dad6 100644 --- a/tests/robots/olex/test_olex2.py +++ b/tests/robots/olex/test_olex2.py @@ -346,10 +346,7 @@ def test_move_refined_files_no_files(tmp_path): test_cif.touch() # Should not raise an error when files don't exist - try: - olex2.move_refined_files(test_cif) - except Exception as e: - pytest.fail(f"move_refined_files raised an exception when files don't exist: {e}") + olex2.move_refined_files(test_cif) def test_move_refined_files_partial_files(tmp_path):