diff --git a/fia_api/scripts/pearl_automation.py b/fia_api/scripts/pearl_automation.py new file mode 100644 index 00000000..93f01a3e --- /dev/null +++ b/fia_api/scripts/pearl_automation.py @@ -0,0 +1,252 @@ +import argparse +import logging +import os +import sys +import time +from pathlib import Path +from typing import Any + +import requests + +from fia_api.core.models import State + +# Simple Mantid Script for PEARL as provided in the issue +PEARL_SCRIPT = """ +from mantid.simpleapi import * +import numpy as np + +Cycles2Run=['25_4'] +Path2Save = r'E:\\\\Data\\\\Moderator' +Path2Data = r'X:\\\\data' + +CycleDict = { + "start_25_4": 124987, + "end_25_4": 124526, +} + +for cycle in Cycles2Run: + reject=[] + peak_centres=[] + peak_centres_error=[] + peak_intensity=[] + peak_intensity_error=[] + uAmps=[] + RunNo=[] + index=0 + start=CycleDict['start_'+cycle] + end=CycleDict['end_'+cycle] + for i in range(start,end+1): + if i == 95382: + continue + Load(Filename=Path2Data+'\\\\cycle_'+cycle+'\\\\PEARL00'+ str(i)+'.nxs', OutputWorkspace=str(i)) + ws = mtd[str(i)] + run = ws.getRun() + pcharge = run.getProtonCharge() + if pcharge <1.0: + reject.append(str(i)) + DeleteWorkspace(str(i)) + continue + NormaliseByCurrent(InputWorkspace=str(i), OutputWorkspace=str(i)) + ExtractSingleSpectrum(InputWorkspace=str(i),WorkspaceIndex=index, + OutputWorkspace=str(i)+ '_' + str(index)) + CropWorkspace(InputWorkspace=str(i)+ '_' + str(index), Xmin=1100, + Xmax=19990, OutputWorkspace=str(i)+ '_' + str(index)) + DeleteWorkspace(str(i)) + + fit_output = Fit(Function='name=Gaussian,Height=19.2327,\\ + PeakCentre=4843.8,Sigma=1532.64,\\ + constraints=(4600 5200.0: + DeleteWorkspace(str(i)+'_0_fit_Parameters') + DeleteWorkspace(str(i)+'_0_fit_Workspace') + DeleteWorkspace(str(i)+'_0') + DeleteWorkspace(str(i)+'_0_fit_NormalisedCovarianceMatrix') + reject.append(str(i)) + continue + else: + uAmps.append(pcharge) + peak_centres.append(paramTable.column(1)[1]) + peak_centres_error.append(paramTable.column(2)[1]) + peak_intensity.append(paramTable.column(1)[0]) + peak_intensity_error.append(paramTable.column(2)[0]) + RunNo.append(str(i)) + DeleteWorkspace(str(i)+'_0') + DeleteWorkspace(str(i)+'_0_fit_Parameters') + DeleteWorkspace(str(i)+'_0_fit_Workspace') + DeleteWorkspace(str(i)+'_0_fit_NormalisedCovarianceMatrix') + + combined_data=np.column_stack( + (RunNo, uAmps, peak_intensity, peak_intensity_error, peak_centres, peak_centres_error) + ) + np.savetxt(Path2Save+'\\\\peak_centres_'+cycle+'.csv', combined_data, delimiter=", ", fmt='% s',) +""" + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +class PearlAutomation: + def __init__( + self, + fia_url: str, + auth_url: str, + username: str | None, + password: str | None, + output_dir: str | Path, + runner_image: str | None = None, + ) -> None: + self.fia_url = fia_url.rstrip("/") + self.auth_url = auth_url.rstrip("/") + self.username = username + self.password = password + self.output_dir = Path(output_dir) + self.runner_image = runner_image + self.token: str | None = None + + def authenticate(self) -> None: + logger.info(f"Authenticating user {self.username} at {self.auth_url}") + try: + response = requests.post( + f"{self.auth_url}/login", json={"username": self.username, "password": self.password}, timeout=30 + ) + response.raise_for_status() + self.token = response.json().get("token") + if not self.token: + raise ValueError("No token found in login response") + logger.info("Authentication successful") + except Exception as e: + logger.error(f"Authentication failed: {e}") + raise + + def get_headers(self) -> dict[str, str]: + return {"Authorization": f"Bearer {self.token}"} + + def get_runner_image(self) -> str: + if self.runner_image: + return self.runner_image + + logger.info("Fetching available Mantid runners") + response = requests.get(f"{self.fia_url}/jobs/runners", headers=self.get_headers(), timeout=30) + response.raise_for_status() + runners = response.json() + if not runners: + raise ValueError("No Mantid runners found") + + # Select latest version if possible, or just the first one + latest_version = sorted(runners.keys())[-1] + logger.info(f"Selected Mantid runner: {latest_version}") + return str(latest_version) + + def submit_job(self, script: str, runner_image: str) -> int: + logger.info(f"Submitting simple job with runner {runner_image}") + payload = {"runner_image": runner_image, "script": script} + response = requests.post(f"{self.fia_url}/job/simple", json=payload, headers=self.get_headers(), timeout=30) + response.raise_for_status() + job_id = int(response.json()) + logger.info(f"Job submitted successfully. Job ID: {job_id}") + return job_id + + def monitor_job(self, job_id: int, poll_interval: int = 5) -> dict[str, Any]: + logger.info(f"Monitoring job {job_id}") + while True: + response = requests.get(f"{self.fia_url}/job/{job_id}", headers=self.get_headers(), timeout=30) + response.raise_for_status() + job_data: dict[str, Any] = response.json() + state = job_data.get("state") + + logger.info(f"Job {job_id} current state: {state}") + + if state == State.SUCCESSFUL.value: + logger.info(f"Job {job_id} completed successfully") + return job_data + if state in [State.ERROR.value, State.UNSUCCESSFUL.value]: + error_msg = job_data.get("status_message", "No error message provided") + logger.error(f"Job {job_id} failed with state {state}: {error_msg}") + raise RuntimeError(f"Job {job_id} failed: {error_msg}") + + time.sleep(poll_interval) + + def download_results(self, job_id: int, outputs: str | list[str] | None) -> None: + if not outputs: + logger.warning(f"No outputs found for job {job_id}") + return + + # Outputs is expected to be a string or list of filenames + filenames = outputs.split(",") if isinstance(outputs, str) else outputs + + self.output_dir.mkdir(parents=True, exist_ok=True) + + for file in filenames: + filename = file.strip() + if not filename: + continue + + logger.info(f"Downloading {filename} for job {job_id}") + response = requests.get( + f"{self.fia_url}/job/{job_id}/filename/{filename}", headers=self.get_headers(), timeout=30, stream=True + ) + response.raise_for_status() + + file_path = self.output_dir / filename + with Path.open(file_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + logger.info(f"Downloaded {filename} to {file_path}") + + def run(self) -> None: + try: + self.authenticate() + runner_image = self.get_runner_image() + job_id = self.submit_job(PEARL_SCRIPT, runner_image) + job_data = self.monitor_job(job_id) + self.download_results(job_id, job_data.get("outputs")) + logger.info("PEARL automation completed successfully") + except Exception as e: + logger.error(f"PEARL automation failed: {e}") + sys.exit(1) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Automate PEARL Mantid jobs via FIA API") + parser.add_argument("--fia-url", default=os.environ.get("FIA_API_URL", "http://localhost:8080"), help="FIA API URL") + parser.add_argument( + "--auth-url", default=os.environ.get("AUTH_API_URL", "http://localhost:8001"), help="Auth API URL" + ) + parser.add_argument("--username", default=os.environ.get("PEARL_USERNAME"), help="Auth Username") + parser.add_argument("--password", default=os.environ.get("PEARL_PASSWORD"), help="Auth Password") + parser.add_argument( + "--output-dir", default=os.environ.get("OUTPUT_DIRECTORY", "./output"), help="Output directory for results" + ) + parser.add_argument( + "--runner", default=os.environ.get("MANTID_RUNNER_IMAGE"), help="Specific Mantid runner image to use" + ) + + args = parser.parse_args() + + if not args.username or not args.password: + err_msg = ( + "Username and password must be provided via " + "arguments or environment variables (PEARL_USERNAME, PEARL_PASSWORD)" + ) + logger.error(err_msg) + sys.exit(1) + + automation = PearlAutomation( + args.fia_url, args.auth_url, args.username, args.password, args.output_dir, args.runner + ) + automation.run() + + +if __name__ == "__main__": + main() diff --git a/pearl_automation.env.example b/pearl_automation.env.example new file mode 100644 index 00000000..23c8e4c0 --- /dev/null +++ b/pearl_automation.env.example @@ -0,0 +1,13 @@ +# FIA API and Auth API URLs +FIA_API_URL=http://localhost:8080 +AUTH_API_URL=http://localhost:8001 + +# Credentials +PEARL_USERNAME=your_username +PEARL_PASSWORD=your_password + +# Output configuration +OUTPUT_DIRECTORY=./output_pearl + +# Optional: Specific Mantid runner image +# MANTID_RUNNER_IMAGE=6.13.1 diff --git a/test/scripts/test_pearl_automation.py b/test/scripts/test_pearl_automation.py new file mode 100644 index 00000000..019577aa --- /dev/null +++ b/test/scripts/test_pearl_automation.py @@ -0,0 +1,238 @@ +import os +import subprocess +import sys +import unittest +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# Add the project root to sys.path to import the script +sys.path.append(str(Path(__file__).parent.parent.parent)) + +from fia_api.core.models import State +from fia_api.scripts.pearl_automation import PearlAutomation, main + + +@pytest.fixture(scope="session") +def get_automation(): + fia_url = "http://fia-api" + auth_url = "http://auth-api" + username = "test_user" + password = "test_pass" # noqa S105 + output_dir = "./test_output" + return PearlAutomation(fia_url, auth_url, username, password, output_dir) + + +@patch("fia_api.scripts.pearl_automation.requests.post") +def test_authenticate_success(mock_post, get_automation): + automation = get_automation + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"token": "valid_token"} + mock_post.return_value = mock_response + + automation.authenticate() + assert automation.token == "valid_token" # noqa S105 + mock_post.assert_called_once_with( + f"{automation.auth_url}/login", + json={"username": automation.username, "password": automation.password}, + timeout=30, + ) + + +@patch("fia_api.scripts.pearl_automation.requests.post") +def test_authenticate_no_token_raises_error(mock_post, get_automation): + automation = get_automation + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {} # Missing token + mock_post.return_value = mock_response + + with pytest.raises(ValueError, match="No token found in login response"): + automation.authenticate() + + +@patch("fia_api.scripts.pearl_automation.requests.get") +def test_get_runner_image_success(mock_get, get_automation): + automation = get_automation + automation.token = "valid_token" # noqa S105 + automation.runner_image = None + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"6.8.0": "sha1", "6.9.0": "sha2"} + mock_get.return_value = mock_response + + runner = automation.get_runner_image() + assert runner == "6.9.0" + mock_get.assert_called_once() + + +def test_get_runner_image_already_set(get_automation): + automation = get_automation + automation.runner_image = "custom-runner" + runner = automation.get_runner_image() + assert runner == "custom-runner" + + +@patch("fia_api.scripts.pearl_automation.requests.get") +def test_get_runner_image_empty_raises_error(mock_get, get_automation): + automation = get_automation + automation.token = "valid_token" # noqa S105 + automation.runner_image = None + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {} + mock_get.return_value = mock_response + + with pytest.raises(ValueError, match="No Mantid runners found"): + automation.get_runner_image() + + +@patch("fia_api.scripts.pearl_automation.requests.post") +def test_submit_job_success(mock_post, get_automation): + automation = get_automation + automation.token = "valid_token" # noqa S105 + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = 12345 + mock_post.return_value = mock_response + + job_id = automation.submit_job("print('hello')", "6.9.0") + expected_job_id = 12345 + assert job_id == expected_job_id + mock_post.assert_called_once() + + +@patch("fia_api.scripts.pearl_automation.requests.get") +@patch("fia_api.scripts.pearl_automation.time.sleep", return_value=None) +def test_monitor_job_success(mock_sleep, mock_get, get_automation): + automation = get_automation + automation.token = "valid_token" # noqa S105 + + # Mock responses for polling: 1st NOT_STARTED, 2nd SUCCESSFUL + mock_response_1 = MagicMock() + mock_response_1.status_code = 200 + mock_response_1.json.return_value = {"state": State.NOT_STARTED.value} + + mock_response_2 = MagicMock() + mock_response_2.status_code = 200 + mock_response_2.json.return_value = {"state": State.SUCCESSFUL.value, "outputs": "file1.csv,file2.csv"} + + mock_get.side_effect = [mock_response_1, mock_response_2] + + job_data = automation.monitor_job(12345, poll_interval=0) + assert job_data["state"] == State.SUCCESSFUL.value + expected_call_count = 2 + assert mock_get.call_count == expected_call_count + + +@pytest.mark.parametrize("state", [State.ERROR.value, State.UNSUCCESSFUL.value]) +@patch("fia_api.scripts.pearl_automation.requests.get") +def test_monitor_job_failure_raises_error(mock_get, get_automation, state): + automation = get_automation + automation.token = "valid_token" # noqa S105 + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"state": state, "status_message": "Something went wrong"} + mock_get.return_value = mock_response + + with pytest.raises(RuntimeError, match="Something went wrong"): + automation.monitor_job(12345) + + +@patch("fia_api.scripts.pearl_automation.requests.get") +@patch("fia_api.scripts.pearl_automation.Path.open", new_callable=unittest.mock.mock_open) +def test_download_results(mock_open, mock_get, get_automation): + automation = get_automation + automation.token = "valid_token" # noqa S105 + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.iter_content.return_value = [b"data1", b"data2"] + mock_get.return_value = mock_response + + automation.download_results(12345, "file1.csv, file2.csv, ") # Added empty entry to test filter + expected_call_count = 2 + + assert mock_get.call_count == expected_call_count + assert mock_open.call_count == expected_call_count + + +def test_download_results_no_outputs(get_automation): + automation = get_automation + with patch("fia_api.scripts.pearl_automation.logger.warning") as mock_log: + automation.download_results(12345, None) + mock_log.assert_called_with("No outputs found for job 12345") + + +@patch("fia_api.scripts.pearl_automation.PearlAutomation.authenticate") +@patch("fia_api.scripts.pearl_automation.PearlAutomation.get_runner_image") +@patch("fia_api.scripts.pearl_automation.PearlAutomation.submit_job") +@patch("fia_api.scripts.pearl_automation.PearlAutomation.monitor_job") +@patch("fia_api.scripts.pearl_automation.PearlAutomation.download_results") +def test_run_success(mock_dl, mock_mon, mock_sub, mock_get_img, mock_auth, get_automation): + automation = get_automation + mock_get_img.return_value = "img" + mock_sub.return_value = 1 + mock_mon.return_value = {"outputs": "out"} + + automation.run() + + mock_auth.assert_called_once() + mock_get_img.assert_called_once() + mock_sub.assert_called_once() + mock_mon.assert_called_once_with(1) + mock_dl.assert_called_once_with(1, "out") + + +@patch("fia_api.scripts.pearl_automation.PearlAutomation.authenticate", side_effect=Exception("Auth fail")) +@patch("fia_api.scripts.pearl_automation.sys.exit") +def test_run_failure(mock_exit: MagicMock, mock_auth: MagicMock, get_automation: PearlAutomation) -> None: + automation = get_automation + automation.run() + mock_exit.assert_called_once_with(1) + + +@patch("fia_api.scripts.pearl_automation.sys.argv", ["pearl_automation.py", "--username", "u", "--password", "p"]) +@patch("fia_api.scripts.pearl_automation.PearlAutomation.run") +def test_main_success(mock_run: MagicMock) -> None: + main() + mock_run.assert_called_once() + + +@patch("fia_api.scripts.pearl_automation.sys.argv", ["pearl_automation.py", "--username", "", "--password", ""]) +@patch("fia_api.scripts.pearl_automation.sys.exit", side_effect=SystemExit) +def test_main_no_creds_exits(mock_exit: MagicMock) -> None: + with patch.dict(os.environ, {}, clear=True), pytest.raises(SystemExit): + main() + mock_exit.assert_called_once_with(1) + + +@patch("fia_api.scripts.pearl_automation.requests.get") +@patch("fia_api.scripts.pearl_automation.Path.open", new_callable=unittest.mock.mock_open) +def test_download_results_list_input( + mock_open: MagicMock, mock_get: MagicMock, get_automation: PearlAutomation +) -> None: + automation = get_automation + automation.token = "valid_token" # noqa: S105 + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.iter_content.return_value = [b"data"] + mock_get.return_value = mock_response + + automation.download_results(12345, ["file1.csv"]) + assert mock_get.call_count == 1 + assert mock_open.call_count == 1 + + +def test_main_entry_point() -> None: + # Run the script as a subprocess to cover the if __name__ == "__main__": block + # We provide invalid args so it exits quickly + result = subprocess.run( # noqa: S603 + [sys.executable, "-m", "fia_api.scripts.pearl_automation", "--username", ""], + capture_output=True, + text=True, + check=False, + ) + assert result.returncode == 1 + assert "Username and password must be provided" in result.stderr