From 98bf54848a75c8352f5cbdad4f30ed50dcab8aa2 Mon Sep 17 00:00:00 2001 From: Rene Orozco Martinez Date: Wed, 18 Feb 2026 13:43:40 -0700 Subject: [PATCH 1/6] feat(server): add schema validation for allocate_data --- server/schemas/openapi.json | 12 +++++- server/src/testflinger/api/schemas.py | 8 +++- server/src/testflinger/api/v1.py | 19 ++++++++- server/tests/test_v1.py | 57 +++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 4 deletions(-) diff --git a/server/schemas/openapi.json b/server/schemas/openapi.json index 53c39433d..66d586f5c 100644 --- a/server/schemas/openapi.json +++ b/server/schemas/openapi.json @@ -89,6 +89,15 @@ ], "type": "object" }, + "AllocateData": { + "additionalProperties": false, + "properties": { + "allocate": { + "type": "boolean" + } + }, + "type": "object" + }, "Attachment": { "additionalProperties": false, "properties": { @@ -187,8 +196,7 @@ "additionalProperties": false, "properties": { "allocate_data": { - "additionalProperties": {}, - "type": "object" + "$ref": "#/components/schemas/AllocateData" }, "allocation_timeout": { "type": "integer" diff --git a/server/src/testflinger/api/schemas.py b/server/src/testflinger/api/schemas.py index cf5531c49..05c319478 100644 --- a/server/src/testflinger/api/schemas.py +++ b/server/src/testflinger/api/schemas.py @@ -343,6 +343,12 @@ class ReserveData(Schema): timeout = DurationField(required=False) +class AllocateData(Schema): + """Schema for the `allocate_data` section of a Testflinger job.""" + + allocate = fields.Boolean(required=False) + + class Job(Schema): """Job schema.""" @@ -361,7 +367,7 @@ class Job(Schema): # i.e. expected fields within `firmware_update_data` firmware_update_data = fields.Dict(required=False) test_data = fields.Nested(TestData, required=False) - allocate_data = fields.Dict(required=False) + allocate_data = fields.Nested(AllocateData, required=False) reserve_data = fields.Nested(ReserveData, required=False) job_status_webhook = fields.String(required=False) job_priority = fields.Integer(required=False) diff --git a/server/src/testflinger/api/v1.py b/server/src/testflinger/api/v1.py index bfbc2ea58..6bd6000c2 100644 --- a/server/src/testflinger/api/v1.py +++ b/server/src/testflinger/api/v1.py @@ -80,12 +80,13 @@ def job_post(json_data: dict) -> dict: """Add a job to the queue.""" job_queue = json_data["job_queue"] exclude_agents = json_data["exclude_agents"] + agents_on_queue = database.get_agents_on_queue(job_queue) if exclude_agents: # Make sure that there are at least some agents in the selected queue # which can run this job. agents_can_run = [ agent - for agent in database.get_agents_on_queue(job_queue) + for agent in agents_on_queue if agent["name"] not in exclude_agents ] if not agents_can_run: @@ -95,6 +96,22 @@ def job_post(json_data: dict) -> dict: "allowed to run this job", ) + # allocate_data is just for multi-device agent provision type + # reject any job with allocate_data if no multi-device agents on the queue + if "allocate_data" in json_data and json_data["allocate_data"].get( + "allocate" + ): + if not any( + agent.get("provision_type") == "multi" for agent in agents_on_queue + ): + abort( + HTTPStatus.UNPROCESSABLE_ENTITY, + message=( + "allocate_data only supported for queues " + "with multi-device agents" + ), + ) + validate_secrets(json_data) try: diff --git a/server/tests/test_v1.py b/server/tests/test_v1.py index fe8c117a3..ccf41cbf6 100644 --- a/server/tests/test_v1.py +++ b/server/tests/test_v1.py @@ -1320,3 +1320,60 @@ def test_get_job_without_agent_id_fails(mongo_app): with app.application.test_client() as raw_client: output = raw_client.get("/v1/job?queue=test") assert output.status_code == HTTPStatus.UNAUTHORIZED + + +@pytest.mark.parametrize( + "provision_type", ["maas", "muxpi", "oem_autoinstall"] +) +def test_reject_job_with_allocate_data_no_multi_device_agents( + mongo_app, provision_type +): + """Test job gets rejected if there are no multi-device agents in queue.""" + app, mongo = mongo_app + + # Setup device with provision type other than 'multi' + mongo.agents.insert_one( + { + "name": "agent1", + "queues": ["test"], + "provision_type": provision_type, + } + ) + + # Submit a job with allocate_data + job_data = {"job_queue": "test", "allocate_data": {"allocate": True}} + output = app.post("/v1/job", json=job_data) + assert output.status_code == HTTPStatus.UNPROCESSABLE_ENTITY + assert ( + "allocate_data only supported for queues with multi-device agents" + in output.json["message"] + ) + + +def test_multi_device_job_allocation(mongo_app): + """Test that a job with allocate_data is valid for a multi-device agent.""" + app, mongo = mongo_app + + # Setup multi-device agent + mongo.agents.insert_one( + { + "name": "agent1", + "queues": ["test"], + "provision_type": "multi", + } + ) + + # Submit a job with allocate_data + job_data = {"job_queue": "test", "allocate_data": {"allocate": True}} + output = app.post("/v1/job", json=job_data) + assert output.status_code == HTTPStatus.OK + job_id = output.json["job_id"] + + # Register as the multi-device agent and get the job + app.post( + "/v1/agents/data/agent1", + json={"state": "waiting", "queues": ["test"], "location": "here"}, + ) + output = app.get("/v1/job?queue=test") + assert output.status_code == HTTPStatus.OK + assert output.json["job_id"] == job_id From 81c3dd2c6fb6aab3bd7d4907ed4b0c758b433079 Mon Sep 17 00:00:00 2001 From: Rene Orozco Martinez Date: Wed, 18 Feb 2026 16:30:03 -0700 Subject: [PATCH 2/6] Revert "feat(server): add schema validation for allocate_data" This reverts commit 3614347f4529ffb8ca89deaf169d9f6193ac0952. --- server/schemas/openapi.json | 12 +----- server/src/testflinger/api/schemas.py | 8 +--- server/src/testflinger/api/v1.py | 19 +-------- server/tests/test_v1.py | 57 --------------------------- 4 files changed, 4 insertions(+), 92 deletions(-) diff --git a/server/schemas/openapi.json b/server/schemas/openapi.json index 66d586f5c..53c39433d 100644 --- a/server/schemas/openapi.json +++ b/server/schemas/openapi.json @@ -89,15 +89,6 @@ ], "type": "object" }, - "AllocateData": { - "additionalProperties": false, - "properties": { - "allocate": { - "type": "boolean" - } - }, - "type": "object" - }, "Attachment": { "additionalProperties": false, "properties": { @@ -196,7 +187,8 @@ "additionalProperties": false, "properties": { "allocate_data": { - "$ref": "#/components/schemas/AllocateData" + "additionalProperties": {}, + "type": "object" }, "allocation_timeout": { "type": "integer" diff --git a/server/src/testflinger/api/schemas.py b/server/src/testflinger/api/schemas.py index 05c319478..cf5531c49 100644 --- a/server/src/testflinger/api/schemas.py +++ b/server/src/testflinger/api/schemas.py @@ -343,12 +343,6 @@ class ReserveData(Schema): timeout = DurationField(required=False) -class AllocateData(Schema): - """Schema for the `allocate_data` section of a Testflinger job.""" - - allocate = fields.Boolean(required=False) - - class Job(Schema): """Job schema.""" @@ -367,7 +361,7 @@ class Job(Schema): # i.e. expected fields within `firmware_update_data` firmware_update_data = fields.Dict(required=False) test_data = fields.Nested(TestData, required=False) - allocate_data = fields.Nested(AllocateData, required=False) + allocate_data = fields.Dict(required=False) reserve_data = fields.Nested(ReserveData, required=False) job_status_webhook = fields.String(required=False) job_priority = fields.Integer(required=False) diff --git a/server/src/testflinger/api/v1.py b/server/src/testflinger/api/v1.py index 6bd6000c2..bfbc2ea58 100644 --- a/server/src/testflinger/api/v1.py +++ b/server/src/testflinger/api/v1.py @@ -80,13 +80,12 @@ def job_post(json_data: dict) -> dict: """Add a job to the queue.""" job_queue = json_data["job_queue"] exclude_agents = json_data["exclude_agents"] - agents_on_queue = database.get_agents_on_queue(job_queue) if exclude_agents: # Make sure that there are at least some agents in the selected queue # which can run this job. agents_can_run = [ agent - for agent in agents_on_queue + for agent in database.get_agents_on_queue(job_queue) if agent["name"] not in exclude_agents ] if not agents_can_run: @@ -96,22 +95,6 @@ def job_post(json_data: dict) -> dict: "allowed to run this job", ) - # allocate_data is just for multi-device agent provision type - # reject any job with allocate_data if no multi-device agents on the queue - if "allocate_data" in json_data and json_data["allocate_data"].get( - "allocate" - ): - if not any( - agent.get("provision_type") == "multi" for agent in agents_on_queue - ): - abort( - HTTPStatus.UNPROCESSABLE_ENTITY, - message=( - "allocate_data only supported for queues " - "with multi-device agents" - ), - ) - validate_secrets(json_data) try: diff --git a/server/tests/test_v1.py b/server/tests/test_v1.py index ccf41cbf6..fe8c117a3 100644 --- a/server/tests/test_v1.py +++ b/server/tests/test_v1.py @@ -1320,60 +1320,3 @@ def test_get_job_without_agent_id_fails(mongo_app): with app.application.test_client() as raw_client: output = raw_client.get("/v1/job?queue=test") assert output.status_code == HTTPStatus.UNAUTHORIZED - - -@pytest.mark.parametrize( - "provision_type", ["maas", "muxpi", "oem_autoinstall"] -) -def test_reject_job_with_allocate_data_no_multi_device_agents( - mongo_app, provision_type -): - """Test job gets rejected if there are no multi-device agents in queue.""" - app, mongo = mongo_app - - # Setup device with provision type other than 'multi' - mongo.agents.insert_one( - { - "name": "agent1", - "queues": ["test"], - "provision_type": provision_type, - } - ) - - # Submit a job with allocate_data - job_data = {"job_queue": "test", "allocate_data": {"allocate": True}} - output = app.post("/v1/job", json=job_data) - assert output.status_code == HTTPStatus.UNPROCESSABLE_ENTITY - assert ( - "allocate_data only supported for queues with multi-device agents" - in output.json["message"] - ) - - -def test_multi_device_job_allocation(mongo_app): - """Test that a job with allocate_data is valid for a multi-device agent.""" - app, mongo = mongo_app - - # Setup multi-device agent - mongo.agents.insert_one( - { - "name": "agent1", - "queues": ["test"], - "provision_type": "multi", - } - ) - - # Submit a job with allocate_data - job_data = {"job_queue": "test", "allocate_data": {"allocate": True}} - output = app.post("/v1/job", json=job_data) - assert output.status_code == HTTPStatus.OK - job_id = output.json["job_id"] - - # Register as the multi-device agent and get the job - app.post( - "/v1/agents/data/agent1", - json={"state": "waiting", "queues": ["test"], "location": "here"}, - ) - output = app.get("/v1/job?queue=test") - assert output.status_code == HTTPStatus.OK - assert output.json["job_id"] == job_id From 7868cdfe386e535b2290e7db86102f16cbb0b7f4 Mon Sep 17 00:00:00 2001 From: Rene Orozco Martinez Date: Wed, 18 Feb 2026 17:53:52 -0700 Subject: [PATCH 3/6] fix(agent): include global timeout during allocate phase --- agent/src/testflinger_agent/job.py | 52 ++++++++++++++++++++------- agent/tests/test_job.py | 56 +++++++++++++++++++++++++++++- 2 files changed, 95 insertions(+), 13 deletions(-) diff --git a/agent/src/testflinger_agent/job.py b/agent/src/testflinger_agent/job.py index 41b0e82ba..f1d8d2243 100644 --- a/agent/src/testflinger_agent/job.py +++ b/agent/src/testflinger_agent/job.py @@ -14,11 +14,12 @@ import json import logging +import signal import time from pathlib import Path from typing import Optional -from testflinger_common.enums import TestPhase +from testflinger_common.enums import TestEvent, TestPhase from testflinger_agent.errors import TFServerError from testflinger_agent.handlers import ( @@ -197,8 +198,18 @@ def run_test_phase(self, phase, rundir): phase, exitcode, ) + + # Allocate phase has special behavior where we wait for the parent job if phase == "allocate": - self.allocate_phase(rundir) + wait_exit_event, wait_exit_reason = self.allocate_phase(rundir) + if wait_exit_event is not None: + # If allocate_phase returns a non-None event + # it means the global timeout was reached + exit_event = wait_exit_event + exit_reason = wait_exit_reason + exitcode = -signal.SIGKILL.value % 256 + self._update_phase_results(results_file, phase, exitcode) + return exitcode, exit_event, exit_reason def _update_phase_results( @@ -231,7 +242,9 @@ def _update_phase_results( exc, ) - def allocate_phase(self, rundir): + def allocate_phase( + self, rundir + ) -> tuple[TestEvent, str] | tuple[None, None]: """ Read the json dict from "device-info.json" and send it to the server so that the multi-device agent can find the IP addresses of all @@ -251,10 +264,13 @@ def allocate_phase(self, rundir): self.client.post_job_state(self.job_id, "allocated") - self.wait_for_completion() + return self.wait_for_completion() - def wait_for_completion(self): + def wait_for_completion(self) -> tuple[TestEvent, str] | tuple[None, None]: """Monitor the parent job and exit when it completes.""" + global_timeout_checker = GlobalTimeoutChecker( + self.get_global_timeout() + ) while True: try: this_job_state = self.client.check_job_state(self.job_id) @@ -265,17 +281,29 @@ def wait_for_completion(self): parent_job_id = self.job_data.get("parent_job_id") if not parent_job_id: logger.warning("No parent job ID found while allocated") - continue - parent_job_state = self.client.check_job_state( - self.job_data.get("parent_job_id") - ) - if parent_job_state in ("complete", "completed", "cancelled"): - logger.info("Parent job completed, exiting...") - break + else: + parent_job_state = self.client.check_job_state( + parent_job_id + ) + if parent_job_state in ( + "complete", + "completed", + "cancelled", + ): + logger.info("Parent job completed, exiting...") + break except TFServerError: logger.warning("Failed to get allocated job status, retrying") + event, reason = global_timeout_checker() + if event is not None: + # If global timeout is reached, return event and reason + return event, reason time.sleep(60) + # The job was completed or cancelled + # Returning a None tuple indicates normal allocate phase completion + return None, None + def get_global_timeout(self): """Get the global timeout for the test run in seconds.""" # Default timeout is 4 hours diff --git a/agent/tests/test_job.py b/agent/tests/test_job.py index e67bbf2be..45158ae86 100644 --- a/agent/tests/test_job.py +++ b/agent/tests/test_job.py @@ -2,6 +2,7 @@ import os import re import shutil +import signal import tempfile import uuid from http import HTTPStatus @@ -9,7 +10,7 @@ import pytest import requests_mock as rmock -from testflinger_common.enums import TestEvent, TestPhase +from testflinger_common.enums import JobState, TestEvent, TestPhase import testflinger_agent from testflinger_agent.client import TestflingerClient as _TestflingerClient @@ -188,6 +189,59 @@ def test_wait_for_completion(self, client): job.wait_for_completion() # No assertions needed, just make sure we don't timeout + @pytest.mark.timeout(5) + @patch.object(GlobalTimeoutChecker, "__call__") + def test_wait_for_completion_global_timeout( + self, mock_checker, client, mocker + ): + """Test wait_for_completion returns the timeout event and reason.""" + mocker.patch.object( + client, "check_job_state", return_value=JobState.ALLOCATE + ) + timeout_reason = "ERROR: Global timeout reached! (1s)" + mock_checker.return_value = (TestEvent.GLOBAL_TIMEOUT, timeout_reason) + + # this job data doesn't contain a parent_job_id on purpose + # global timeout should trigger for allocate phase + job = _TestflingerJob({"global_timeout": 1}, client) + event, reason = job.wait_for_completion() + + assert event == TestEvent.GLOBAL_TIMEOUT + assert reason == timeout_reason + + @patch.object(_TestflingerJob, "allocate_phase") + def test_allocate_phase_global_timeout_exit_values( + self, mock_allocate, client, tmp_path, requests_mock + ): + """Test exit values on global timeout during allocate phase.""" + self.config["allocate_command"] = "/bin/true" + fake_job_data = { + "global_timeout": 1, + "allocate_data": {"allocate": True}, + } + outcome_file_path = tmp_path / "testflinger-outcome.json" + outcome_file_path.write_text("{}") + + requests_mock.post(rmock.ANY, status_code=HTTPStatus.OK) + requests_mock.get(rmock.ANY, status_code=HTTPStatus.OK) + + timeout_reason = "ERROR: Global timeout reached! (1s)" + mock_allocate.return_value = (TestEvent.GLOBAL_TIMEOUT, timeout_reason) + job = _TestflingerJob(fake_job_data, client) + exitcode, exit_event, exit_reason = job.run_test_phase( + "allocate", tmp_path + ) + + assert exitcode == -signal.SIGKILL.value % 256 + assert exit_event == TestEvent.GLOBAL_TIMEOUT + assert exit_reason == timeout_reason + + with outcome_file_path.open() as f: + outcome_data = json.load(f) + assert ( + outcome_data["status"]["allocate"] == -signal.SIGKILL.value % 256 + ) + def test_get_device_info(self, client, tmp_path): """Test job can read from device-info file.""" # Create device-info.json to simulate device-connector From 58df8ea0d4377ca51b72971e3b7938d1c17bac0b Mon Sep 17 00:00:00 2001 From: Rene Orozco Martinez Date: Tue, 24 Feb 2026 14:58:45 -0700 Subject: [PATCH 4/6] set global timeout checker at the beginning of the phase --- agent/src/testflinger_agent/job.py | 49 ++++++++++++++++++++---------- agent/tests/test_job.py | 9 ++++-- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/agent/src/testflinger_agent/job.py b/agent/src/testflinger_agent/job.py index f1d8d2243..666c5f69f 100644 --- a/agent/src/testflinger_agent/job.py +++ b/agent/src/testflinger_agent/job.py @@ -19,7 +19,7 @@ from pathlib import Path from typing import Optional -from testflinger_common.enums import TestEvent, TestPhase +from testflinger_common.enums import JobState, TestEvent, TestPhase from testflinger_agent.errors import TFServerError from testflinger_agent.handlers import ( @@ -200,8 +200,10 @@ def run_test_phase(self, phase, rundir): ) # Allocate phase has special behavior where we wait for the parent job - if phase == "allocate": - wait_exit_event, wait_exit_reason = self.allocate_phase(rundir) + if phase == TestPhase.ALLOCATE: + wait_exit_event, wait_exit_reason = self.allocate_phase( + rundir, global_timeout_checker + ) if wait_exit_event is not None: # If allocate_phase returns a non-None event # it means the global timeout was reached @@ -243,12 +245,18 @@ def _update_phase_results( ) def allocate_phase( - self, rundir + self, + rundir: str, + global_timeout_checker: GlobalTimeoutChecker, ) -> tuple[TestEvent, str] | tuple[None, None]: - """ - Read the json dict from "device-info.json" and send it to the server + """Send device info to server and wait for the parent job completion. + + This reads "device-info.json" and sends it to the server so that the multi-device agent can find the IP addresses of all subordinate jobs. + + :param rundir: Directory in which to look for "device-info.json" + :param global_timeout_checker: timeout checker for the allocate phase. """ device_info = self.get_device_info(rundir) @@ -262,15 +270,22 @@ def allocate_phase( logger.warning("Failed to post device_info, retrying...") time.sleep(60) - self.client.post_job_state(self.job_id, "allocated") + # Multi Device looks for a job that is `allocated` to know when + # a child job has already sent its device info and is ready. + self.client.post_job_state(self.job_id, JobState.ALLOCATED) - return self.wait_for_completion() + # Child job is now allocated, wait for parent job to complete + # or for global timeout to be reached. + return self.wait_for_completion(global_timeout_checker) - def wait_for_completion(self) -> tuple[TestEvent, str] | tuple[None, None]: - """Monitor the parent job and exit when it completes.""" - global_timeout_checker = GlobalTimeoutChecker( - self.get_global_timeout() - ) + def wait_for_completion( + self, + global_timeout_checker: GlobalTimeoutChecker, + ) -> tuple[TestEvent, str] | tuple[None, None]: + """Monitor the parent job and exit when it completes. + + :param global_timeout_checker: timeout checker for the allocate phase. + """ while True: try: this_job_state = self.client.check_job_state(self.job_id) @@ -280,15 +295,17 @@ def wait_for_completion(self) -> tuple[TestEvent, str] | tuple[None, None]: parent_job_id = self.job_data.get("parent_job_id") if not parent_job_id: + # TODO: Remove this path once Spread testing use cases + # are migrated to reserve phase instead of allocate phase. logger.warning("No parent job ID found while allocated") else: parent_job_state = self.client.check_job_state( parent_job_id ) if parent_job_state in ( - "complete", - "completed", - "cancelled", + JobState.COMPLETE, + JobState.COMPLETED, + JobState.CANCELLED, ): logger.info("Parent job completed, exiting...") break diff --git a/agent/tests/test_job.py b/agent/tests/test_job.py index 45158ae86..4264fe3f5 100644 --- a/agent/tests/test_job.py +++ b/agent/tests/test_job.py @@ -186,7 +186,8 @@ def test_wait_for_completion(self, client): client.check_job_state = lambda _: "completed" job = _TestflingerJob({"parent_job_id": "999"}, client) - job.wait_for_completion() + checker = GlobalTimeoutChecker(4 * 60 * 60) + job.wait_for_completion(checker) # No assertions needed, just make sure we don't timeout @pytest.mark.timeout(5) @@ -203,8 +204,10 @@ def test_wait_for_completion_global_timeout( # this job data doesn't contain a parent_job_id on purpose # global timeout should trigger for allocate phase - job = _TestflingerJob({"global_timeout": 1}, client) - event, reason = job.wait_for_completion() + timeout_sec = 1 + job = _TestflingerJob({"global_timeout": timeout_sec}, client) + checker = GlobalTimeoutChecker(timeout_sec) + event, reason = job.wait_for_completion(checker) assert event == TestEvent.GLOBAL_TIMEOUT assert reason == timeout_reason From 8a3e71cfb5640d2ca73efba1f95ff18becceacfd Mon Sep 17 00:00:00 2001 From: Rene Orozco Martinez Date: Tue, 24 Feb 2026 15:27:45 -0700 Subject: [PATCH 5/6] add unit tests --- agent/src/testflinger_agent/job.py | 3 +- agent/tests/test_job.py | 74 ++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/agent/src/testflinger_agent/job.py b/agent/src/testflinger_agent/job.py index 666c5f69f..e3807499e 100644 --- a/agent/src/testflinger_agent/job.py +++ b/agent/src/testflinger_agent/job.py @@ -289,7 +289,7 @@ def wait_for_completion( while True: try: this_job_state = self.client.check_job_state(self.job_id) - if this_job_state in ("complete", "completed", "cancelled"): + if this_job_state in (JobState.COMPLETED, JobState.CANCELLED): logger.info("This job completed, exiting...") break @@ -303,7 +303,6 @@ def wait_for_completion( parent_job_id ) if parent_job_state in ( - JobState.COMPLETE, JobState.COMPLETED, JobState.CANCELLED, ): diff --git a/agent/tests/test_job.py b/agent/tests/test_job.py index 4264fe3f5..2315209bc 100644 --- a/agent/tests/test_job.py +++ b/agent/tests/test_job.py @@ -14,6 +14,7 @@ import testflinger_agent from testflinger_agent.client import TestflingerClient as _TestflingerClient +from testflinger_agent.errors import TFServerError from testflinger_agent.handlers import FileLogHandler from testflinger_agent.job import ( TestflingerJob as _TestflingerJob, @@ -245,6 +246,79 @@ def test_allocate_phase_global_timeout_exit_values( outcome_data["status"]["allocate"] == -signal.SIGKILL.value % 256 ) + @patch.object(_TestflingerJob, "allocate_phase") + def test_allocate_phase_normal_completion( + self, mock_allocate, client, tmp_path, requests_mock + ): + """Test allocate phase completes normally without timeout.""" + self.config["allocate_command"] = "/bin/true" + fake_job_data = { + "global_timeout": 60, + "allocate_data": {"allocate": True}, + } + outcome_file_path = tmp_path / "testflinger-outcome.json" + outcome_file_path.write_text("{}") + + requests_mock.post(rmock.ANY, status_code=HTTPStatus.OK) + requests_mock.get(rmock.ANY, status_code=HTTPStatus.OK) + + # allocate_phase returns (None, None) on normal completion + mock_allocate.return_value = (None, None) + job = _TestflingerJob(fake_job_data, client) + exitcode, exit_event, exit_reason = job.run_test_phase( + "allocate", tmp_path + ) + + assert exitcode == 0 + assert exit_event != TestEvent.GLOBAL_TIMEOUT + + @patch("testflinger_agent.job.time.sleep") + @patch.object(_TestflingerJob, "wait_for_completion") + def test_allocate_phase_post_result_retry( + self, mock_wait, mock_sleep, client, tmp_path, requests_mock + ): + """Test that allocate_phase retries posting device info on failure.""" + mock_wait.return_value = (None, None) + requests_mock.post( + rmock.ANY, + [ + {"exc": TFServerError(HTTPStatus.INTERNAL_SERVER_ERROR)}, + {"status_code": HTTPStatus.OK}, + ], + ) + requests_mock.get(rmock.ANY, status_code=HTTPStatus.OK) + + checker = GlobalTimeoutChecker(60) + job = _TestflingerJob({}, client) + job.allocate_phase(tmp_path, checker) + + # post_result should have been called twice (one failure + one retry) + assert requests_mock.call_count >= 2 + mock_sleep.assert_called_once_with(60) + + @pytest.mark.timeout(5) + def test_wait_for_completion_parent_completes(self, client, mocker): + """Test wait_for_completion exits when the parent job completes.""" + parent_job_id = str(uuid.uuid1()) + this_job_id = str(uuid.uuid1()) + fake_job_data = {"parent_job_id": parent_job_id, "job_id": this_job_id} + job = _TestflingerJob(fake_job_data, client) + + # First call checks this job (allocated), + # second call checks parent job (completed) + mocker.patch.object( + client, + "check_job_state", + side_effect=[JobState.ALLOCATED, JobState.COMPLETED], + ) + + checker = GlobalTimeoutChecker(60) + event, reason = job.wait_for_completion(checker) + + # Assert that the exit event and reason indicate normal completion + assert event is None + assert reason is None + def test_get_device_info(self, client, tmp_path): """Test job can read from device-info file.""" # Create device-info.json to simulate device-connector From d104f1d86beef5657c9e28aca4da148c012f070a Mon Sep 17 00:00:00 2001 From: Rene Orozco Martinez Date: Tue, 24 Feb 2026 15:39:04 -0700 Subject: [PATCH 6/6] devices: update allocate method --- .../src/testflinger_device_connectors/devices/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/device-connectors/src/testflinger_device_connectors/devices/__init__.py b/device-connectors/src/testflinger_device_connectors/devices/__init__.py index aac6fc929..abe4f290d 100644 --- a/device-connectors/src/testflinger_device_connectors/devices/__init__.py +++ b/device-connectors/src/testflinger_device_connectors/devices/__init__.py @@ -265,7 +265,7 @@ def runtest(self, args): logger.info("END testrun") return exitcode - def allocate(self): + def allocate(self, _): """Allocate devices for multi-agent jobs (default method).""" pass