Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 62 additions & 18 deletions agent/src/testflinger_agent/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

import json
import logging
import signal
import time
from pathlib import Path
from typing import Optional

from testflinger_common.enums import TestPhase
from testflinger_common.enums import JobState, TestEvent, TestPhase

from testflinger_agent.errors import TFServerError
from testflinger_agent.handlers import (
Expand Down Expand Up @@ -197,8 +198,20 @@ def run_test_phase(self, phase, rundir):
phase,
exitcode,
)
if phase == "allocate":
self.allocate_phase(rundir)

# Allocate phase has special behavior where we wait for the parent job
if phase == TestPhase.ALLOCATE:
wait_exit_event, wait_exit_reason = self.allocate_phase(
rundir, global_timeout_checker
)
if wait_exit_event is not None:
# If allocate_phase returns a non-None event
# it means the global timeout was reached
exit_event = wait_exit_event
exit_reason = wait_exit_reason
exitcode = -signal.SIGKILL.value % 256
self._update_phase_results(results_file, phase, exitcode)

return exitcode, exit_event, exit_reason

def _update_phase_results(
Expand Down Expand Up @@ -231,11 +244,19 @@ def _update_phase_results(
exc,
)

def allocate_phase(self, rundir):
"""
Read the json dict from "device-info.json" and send it to the server
def allocate_phase(
self,
rundir: str,
global_timeout_checker: GlobalTimeoutChecker,
) -> tuple[TestEvent, str] | tuple[None, None]:
"""Send device info to server and wait for the parent job completion.

This reads "device-info.json" and sends it to the server
so that the multi-device agent can find the IP addresses of all
subordinate jobs.

:param rundir: Directory in which to look for "device-info.json"
:param global_timeout_checker: timeout checker for the allocate phase.
"""
device_info = self.get_device_info(rundir)

Expand All @@ -249,33 +270,56 @@ def allocate_phase(self, rundir):
logger.warning("Failed to post device_info, retrying...")
time.sleep(60)

self.client.post_job_state(self.job_id, "allocated")
# Multi Device looks for a job that is `allocated` to know when
# a child job has already sent its device info and is ready.
self.client.post_job_state(self.job_id, JobState.ALLOCATED)

self.wait_for_completion()
# Child job is now allocated, wait for parent job to complete
# or for global timeout to be reached.
return self.wait_for_completion(global_timeout_checker)

def wait_for_completion(self):
"""Monitor the parent job and exit when it completes."""
def wait_for_completion(
self,
global_timeout_checker: GlobalTimeoutChecker,
) -> tuple[TestEvent, str] | tuple[None, None]:
"""Monitor the parent job and exit when it completes.

:param global_timeout_checker: timeout checker for the allocate phase.
"""
while True:
try:
this_job_state = self.client.check_job_state(self.job_id)
if this_job_state in ("complete", "completed", "cancelled"):
if this_job_state in (JobState.COMPLETED, JobState.CANCELLED):
logger.info("This job completed, exiting...")
break

parent_job_id = self.job_data.get("parent_job_id")
if not parent_job_id:
# TODO: Remove this path once Spread testing use cases
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any sort of future ticket we can reference in this TODO to make sure this happens?

# are migrated to reserve phase instead of allocate phase.
logger.warning("No parent job ID found while allocated")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will remedy this situation? We seem to get into this while loop, and if we don't have a parent_job_id, how will we get one? Will we just spin around in this loop and never leave?

continue
parent_job_state = self.client.check_job_state(
self.job_data.get("parent_job_id")
)
if parent_job_state in ("complete", "completed", "cancelled"):
logger.info("Parent job completed, exiting...")
break
else:
parent_job_state = self.client.check_job_state(
parent_job_id
)
if parent_job_state in (
JobState.COMPLETED,
JobState.CANCELLED,
):
logger.info("Parent job completed, exiting...")
break
except TFServerError:
logger.warning("Failed to get allocated job status, retrying")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two jobs that could be responsible for this error -- parent and self.job_id; should we have two try statements to isolate them and provide a better error message and handling?

event, reason = global_timeout_checker()
if event is not None:
# If global timeout is reached, return event and reason
return event, reason
time.sleep(60)

# The job was completed or cancelled
# Returning a None tuple indicates normal allocate phase completion
return None, None

def get_global_timeout(self):
"""Get the global timeout for the test run in seconds."""
# Default timeout is 4 hours
Expand Down
135 changes: 133 additions & 2 deletions agent/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@
import os
import re
import shutil
import signal
import tempfile
import uuid
from http import HTTPStatus
from unittest.mock import patch

import pytest
import requests_mock as rmock
from testflinger_common.enums import TestEvent, TestPhase
from testflinger_common.enums import JobState, TestEvent, TestPhase

import testflinger_agent
from testflinger_agent.client import TestflingerClient as _TestflingerClient
from testflinger_agent.errors import TFServerError
from testflinger_agent.handlers import FileLogHandler
from testflinger_agent.job import (
TestflingerJob as _TestflingerJob,
Expand Down Expand Up @@ -185,9 +187,138 @@ def test_wait_for_completion(self, client):
client.check_job_state = lambda _: "completed"

job = _TestflingerJob({"parent_job_id": "999"}, client)
job.wait_for_completion()
checker = GlobalTimeoutChecker(4 * 60 * 60)
job.wait_for_completion(checker)
# No assertions needed, just make sure we don't timeout

@pytest.mark.timeout(5)
@patch.object(GlobalTimeoutChecker, "__call__")
def test_wait_for_completion_global_timeout(
self, mock_checker, client, mocker
):
"""Test wait_for_completion returns the timeout event and reason."""
mocker.patch.object(
client, "check_job_state", return_value=JobState.ALLOCATE
)
timeout_reason = "ERROR: Global timeout reached! (1s)"
mock_checker.return_value = (TestEvent.GLOBAL_TIMEOUT, timeout_reason)

# this job data doesn't contain a parent_job_id on purpose
# global timeout should trigger for allocate phase
timeout_sec = 1
job = _TestflingerJob({"global_timeout": timeout_sec}, client)
checker = GlobalTimeoutChecker(timeout_sec)
event, reason = job.wait_for_completion(checker)

assert event == TestEvent.GLOBAL_TIMEOUT
assert reason == timeout_reason

@patch.object(_TestflingerJob, "allocate_phase")
def test_allocate_phase_global_timeout_exit_values(
self, mock_allocate, client, tmp_path, requests_mock
):
"""Test exit values on global timeout during allocate phase."""
self.config["allocate_command"] = "/bin/true"
fake_job_data = {
"global_timeout": 1,
"allocate_data": {"allocate": True},
}
outcome_file_path = tmp_path / "testflinger-outcome.json"
outcome_file_path.write_text("{}")

requests_mock.post(rmock.ANY, status_code=HTTPStatus.OK)
requests_mock.get(rmock.ANY, status_code=HTTPStatus.OK)

timeout_reason = "ERROR: Global timeout reached! (1s)"
mock_allocate.return_value = (TestEvent.GLOBAL_TIMEOUT, timeout_reason)
job = _TestflingerJob(fake_job_data, client)
exitcode, exit_event, exit_reason = job.run_test_phase(
"allocate", tmp_path
)

assert exitcode == -signal.SIGKILL.value % 256
assert exit_event == TestEvent.GLOBAL_TIMEOUT
assert exit_reason == timeout_reason

with outcome_file_path.open() as f:
outcome_data = json.load(f)
assert (
outcome_data["status"]["allocate"] == -signal.SIGKILL.value % 256
)

@patch.object(_TestflingerJob, "allocate_phase")
def test_allocate_phase_normal_completion(
self, mock_allocate, client, tmp_path, requests_mock
):
"""Test allocate phase completes normally without timeout."""
self.config["allocate_command"] = "/bin/true"
fake_job_data = {
"global_timeout": 60,
"allocate_data": {"allocate": True},
}
outcome_file_path = tmp_path / "testflinger-outcome.json"
outcome_file_path.write_text("{}")

requests_mock.post(rmock.ANY, status_code=HTTPStatus.OK)
requests_mock.get(rmock.ANY, status_code=HTTPStatus.OK)

# allocate_phase returns (None, None) on normal completion
mock_allocate.return_value = (None, None)
job = _TestflingerJob(fake_job_data, client)
exitcode, exit_event, exit_reason = job.run_test_phase(
"allocate", tmp_path
)

assert exitcode == 0
assert exit_event != TestEvent.GLOBAL_TIMEOUT

@patch("testflinger_agent.job.time.sleep")
@patch.object(_TestflingerJob, "wait_for_completion")
def test_allocate_phase_post_result_retry(
self, mock_wait, mock_sleep, client, tmp_path, requests_mock
):
"""Test that allocate_phase retries posting device info on failure."""
mock_wait.return_value = (None, None)
requests_mock.post(
rmock.ANY,
[
{"exc": TFServerError(HTTPStatus.INTERNAL_SERVER_ERROR)},
{"status_code": HTTPStatus.OK},
],
)
requests_mock.get(rmock.ANY, status_code=HTTPStatus.OK)

checker = GlobalTimeoutChecker(60)
job = _TestflingerJob({}, client)
job.allocate_phase(tmp_path, checker)

# post_result should have been called twice (one failure + one retry)
assert requests_mock.call_count >= 2
mock_sleep.assert_called_once_with(60)

@pytest.mark.timeout(5)
def test_wait_for_completion_parent_completes(self, client, mocker):
"""Test wait_for_completion exits when the parent job completes."""
parent_job_id = str(uuid.uuid1())
this_job_id = str(uuid.uuid1())
fake_job_data = {"parent_job_id": parent_job_id, "job_id": this_job_id}
job = _TestflingerJob(fake_job_data, client)

# First call checks this job (allocated),
# second call checks parent job (completed)
mocker.patch.object(
client,
"check_job_state",
side_effect=[JobState.ALLOCATED, JobState.COMPLETED],
)

checker = GlobalTimeoutChecker(60)
event, reason = job.wait_for_completion(checker)

# Assert that the exit event and reason indicate normal completion
assert event is None
assert reason is None

def test_get_device_info(self, client, tmp_path):
"""Test job can read from device-info file."""
# Create device-info.json to simulate device-connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def runtest(self, args):
logger.info("END testrun")
return exitcode

def allocate(self):
def allocate(self, _):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why aren't we naming this new arg? Is allocate intended to be an abstract method for multi-device sub-classes?

"""Allocate devices for multi-agent jobs (default method)."""
pass

Expand Down