diff --git a/agent/src/testflinger_agent/job.py b/agent/src/testflinger_agent/job.py index 41b0e82ba..e3807499e 100644 --- a/agent/src/testflinger_agent/job.py +++ b/agent/src/testflinger_agent/job.py @@ -14,11 +14,12 @@ import json import logging +import signal import time from pathlib import Path from typing import Optional -from testflinger_common.enums import TestPhase +from testflinger_common.enums import JobState, TestEvent, TestPhase from testflinger_agent.errors import TFServerError from testflinger_agent.handlers import ( @@ -197,8 +198,20 @@ def run_test_phase(self, phase, rundir): phase, exitcode, ) - if phase == "allocate": - self.allocate_phase(rundir) + + # Allocate phase has special behavior where we wait for the parent job + if phase == TestPhase.ALLOCATE: + wait_exit_event, wait_exit_reason = self.allocate_phase( + rundir, global_timeout_checker + ) + if wait_exit_event is not None: + # If allocate_phase returns a non-None event + # it means the global timeout was reached + exit_event = wait_exit_event + exit_reason = wait_exit_reason + exitcode = -signal.SIGKILL.value % 256 + self._update_phase_results(results_file, phase, exitcode) + return exitcode, exit_event, exit_reason def _update_phase_results( @@ -231,11 +244,19 @@ def _update_phase_results( exc, ) - def allocate_phase(self, rundir): - """ - Read the json dict from "device-info.json" and send it to the server + def allocate_phase( + self, + rundir: str, + global_timeout_checker: GlobalTimeoutChecker, + ) -> tuple[TestEvent, str] | tuple[None, None]: + """Send device info to server and wait for the parent job completion. + + This reads "device-info.json" and sends it to the server so that the multi-device agent can find the IP addresses of all subordinate jobs. + + :param rundir: Directory in which to look for "device-info.json" + :param global_timeout_checker: timeout checker for the allocate phase. """ device_info = self.get_device_info(rundir) @@ -249,33 +270,56 @@ def allocate_phase(self, rundir): logger.warning("Failed to post device_info, retrying...") time.sleep(60) - self.client.post_job_state(self.job_id, "allocated") + # Multi Device looks for a job that is `allocated` to know when + # a child job has already sent its device info and is ready. + self.client.post_job_state(self.job_id, JobState.ALLOCATED) - self.wait_for_completion() + # Child job is now allocated, wait for parent job to complete + # or for global timeout to be reached. + return self.wait_for_completion(global_timeout_checker) - def wait_for_completion(self): - """Monitor the parent job and exit when it completes.""" + def wait_for_completion( + self, + global_timeout_checker: GlobalTimeoutChecker, + ) -> tuple[TestEvent, str] | tuple[None, None]: + """Monitor the parent job and exit when it completes. + + :param global_timeout_checker: timeout checker for the allocate phase. + """ while True: try: this_job_state = self.client.check_job_state(self.job_id) - if this_job_state in ("complete", "completed", "cancelled"): + if this_job_state in (JobState.COMPLETED, JobState.CANCELLED): logger.info("This job completed, exiting...") break parent_job_id = self.job_data.get("parent_job_id") if not parent_job_id: + # TODO: Remove this path once Spread testing use cases + # are migrated to reserve phase instead of allocate phase. logger.warning("No parent job ID found while allocated") - continue - parent_job_state = self.client.check_job_state( - self.job_data.get("parent_job_id") - ) - if parent_job_state in ("complete", "completed", "cancelled"): - logger.info("Parent job completed, exiting...") - break + else: + parent_job_state = self.client.check_job_state( + parent_job_id + ) + if parent_job_state in ( + JobState.COMPLETED, + JobState.CANCELLED, + ): + logger.info("Parent job completed, exiting...") + break except TFServerError: logger.warning("Failed to get allocated job status, retrying") + event, reason = global_timeout_checker() + if event is not None: + # If global timeout is reached, return event and reason + return event, reason time.sleep(60) + # The job was completed or cancelled + # Returning a None tuple indicates normal allocate phase completion + return None, None + def get_global_timeout(self): """Get the global timeout for the test run in seconds.""" # Default timeout is 4 hours diff --git a/agent/tests/test_job.py b/agent/tests/test_job.py index e67bbf2be..2315209bc 100644 --- a/agent/tests/test_job.py +++ b/agent/tests/test_job.py @@ -2,6 +2,7 @@ import os import re import shutil +import signal import tempfile import uuid from http import HTTPStatus @@ -9,10 +10,11 @@ import pytest import requests_mock as rmock -from testflinger_common.enums import TestEvent, TestPhase +from testflinger_common.enums import JobState, TestEvent, TestPhase import testflinger_agent from testflinger_agent.client import TestflingerClient as _TestflingerClient +from testflinger_agent.errors import TFServerError from testflinger_agent.handlers import FileLogHandler from testflinger_agent.job import ( TestflingerJob as _TestflingerJob, @@ -185,9 +187,138 @@ def test_wait_for_completion(self, client): client.check_job_state = lambda _: "completed" job = _TestflingerJob({"parent_job_id": "999"}, client) - job.wait_for_completion() + checker = GlobalTimeoutChecker(4 * 60 * 60) + job.wait_for_completion(checker) # No assertions needed, just make sure we don't timeout + @pytest.mark.timeout(5) + @patch.object(GlobalTimeoutChecker, "__call__") + def test_wait_for_completion_global_timeout( + self, mock_checker, client, mocker + ): + """Test wait_for_completion returns the timeout event and reason.""" + mocker.patch.object( + client, "check_job_state", return_value=JobState.ALLOCATE + ) + timeout_reason = "ERROR: Global timeout reached! (1s)" + mock_checker.return_value = (TestEvent.GLOBAL_TIMEOUT, timeout_reason) + + # this job data doesn't contain a parent_job_id on purpose + # global timeout should trigger for allocate phase + timeout_sec = 1 + job = _TestflingerJob({"global_timeout": timeout_sec}, client) + checker = GlobalTimeoutChecker(timeout_sec) + event, reason = job.wait_for_completion(checker) + + assert event == TestEvent.GLOBAL_TIMEOUT + assert reason == timeout_reason + + @patch.object(_TestflingerJob, "allocate_phase") + def test_allocate_phase_global_timeout_exit_values( + self, mock_allocate, client, tmp_path, requests_mock + ): + """Test exit values on global timeout during allocate phase.""" + self.config["allocate_command"] = "/bin/true" + fake_job_data = { + "global_timeout": 1, + "allocate_data": {"allocate": True}, + } + outcome_file_path = tmp_path / "testflinger-outcome.json" + outcome_file_path.write_text("{}") + + requests_mock.post(rmock.ANY, status_code=HTTPStatus.OK) + requests_mock.get(rmock.ANY, status_code=HTTPStatus.OK) + + timeout_reason = "ERROR: Global timeout reached! (1s)" + mock_allocate.return_value = (TestEvent.GLOBAL_TIMEOUT, timeout_reason) + job = _TestflingerJob(fake_job_data, client) + exitcode, exit_event, exit_reason = job.run_test_phase( + "allocate", tmp_path + ) + + assert exitcode == -signal.SIGKILL.value % 256 + assert exit_event == TestEvent.GLOBAL_TIMEOUT + assert exit_reason == timeout_reason + + with outcome_file_path.open() as f: + outcome_data = json.load(f) + assert ( + outcome_data["status"]["allocate"] == -signal.SIGKILL.value % 256 + ) + + @patch.object(_TestflingerJob, "allocate_phase") + def test_allocate_phase_normal_completion( + self, mock_allocate, client, tmp_path, requests_mock + ): + """Test allocate phase completes normally without timeout.""" + self.config["allocate_command"] = "/bin/true" + fake_job_data = { + "global_timeout": 60, + "allocate_data": {"allocate": True}, + } + outcome_file_path = tmp_path / "testflinger-outcome.json" + outcome_file_path.write_text("{}") + + requests_mock.post(rmock.ANY, status_code=HTTPStatus.OK) + requests_mock.get(rmock.ANY, status_code=HTTPStatus.OK) + + # allocate_phase returns (None, None) on normal completion + mock_allocate.return_value = (None, None) + job = _TestflingerJob(fake_job_data, client) + exitcode, exit_event, exit_reason = job.run_test_phase( + "allocate", tmp_path + ) + + assert exitcode == 0 + assert exit_event != TestEvent.GLOBAL_TIMEOUT + + @patch("testflinger_agent.job.time.sleep") + @patch.object(_TestflingerJob, "wait_for_completion") + def test_allocate_phase_post_result_retry( + self, mock_wait, mock_sleep, client, tmp_path, requests_mock + ): + """Test that allocate_phase retries posting device info on failure.""" + mock_wait.return_value = (None, None) + requests_mock.post( + rmock.ANY, + [ + {"exc": TFServerError(HTTPStatus.INTERNAL_SERVER_ERROR)}, + {"status_code": HTTPStatus.OK}, + ], + ) + requests_mock.get(rmock.ANY, status_code=HTTPStatus.OK) + + checker = GlobalTimeoutChecker(60) + job = _TestflingerJob({}, client) + job.allocate_phase(tmp_path, checker) + + # post_result should have been called twice (one failure + one retry) + assert requests_mock.call_count >= 2 + mock_sleep.assert_called_once_with(60) + + @pytest.mark.timeout(5) + def test_wait_for_completion_parent_completes(self, client, mocker): + """Test wait_for_completion exits when the parent job completes.""" + parent_job_id = str(uuid.uuid1()) + this_job_id = str(uuid.uuid1()) + fake_job_data = {"parent_job_id": parent_job_id, "job_id": this_job_id} + job = _TestflingerJob(fake_job_data, client) + + # First call checks this job (allocated), + # second call checks parent job (completed) + mocker.patch.object( + client, + "check_job_state", + side_effect=[JobState.ALLOCATED, JobState.COMPLETED], + ) + + checker = GlobalTimeoutChecker(60) + event, reason = job.wait_for_completion(checker) + + # Assert that the exit event and reason indicate normal completion + assert event is None + assert reason is None + def test_get_device_info(self, client, tmp_path): """Test job can read from device-info file.""" # Create device-info.json to simulate device-connector diff --git a/device-connectors/src/testflinger_device_connectors/devices/__init__.py b/device-connectors/src/testflinger_device_connectors/devices/__init__.py index aac6fc929..abe4f290d 100644 --- a/device-connectors/src/testflinger_device_connectors/devices/__init__.py +++ b/device-connectors/src/testflinger_device_connectors/devices/__init__.py @@ -265,7 +265,7 @@ def runtest(self, args): logger.info("END testrun") return exitcode - def allocate(self): + def allocate(self, _): """Allocate devices for multi-agent jobs (default method).""" pass