Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 108 additions & 25 deletions os_tests/libs/resources_oci.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
LaunchInstanceShapeConfigDetails,
CreateVolumeDetails,
AttachIScsiVolumeDetails,
AttachParavirtualizedVolumeDetails,
CreateVcnDetails,
)
except ImportError:
Expand Down Expand Up @@ -438,11 +439,10 @@ def is_started(self):
def attach_block(self, disk, target, wait=True, timeout=120):
try:
LOG.info("try to attach {} to {}".format(disk.id, self.id))
attach_details = AttachIScsiVolumeDetails(
attach_details = AttachParavirtualizedVolumeDetails(
display_name=target,
instance_id=self.id,
volume_id=disk.id,
type='iscsi'
volume_id=disk.id
)
response = self.compute_client.attach_volume(attach_details)
if wait:
Expand Down Expand Up @@ -485,11 +485,70 @@ def detach_block(self, disk, wait=True, force=False):
LOG.error("Failed to detach volume: {}".format(err))
return False

def attach_nic(self, nic, wait=True, timeout=120):
raise NotImplementedError
def attach_nic(self, nic, wait=True, timeout=120, **kwargs):
try:
LOG.info("try to attach nic to {}".format(self.id))
from oci.core.models import AttachVnicDetails, CreateVnicDetails
attach_vnic_details = AttachVnicDetails(
instance_id=self.id,
create_vnic_details=CreateVnicDetails(
subnet_id=nic.subnet_id,
assign_public_ip=False,
display_name=nic.tag,
Comment on lines +488 to +497
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add tests covering the new OCI NIC attach/detach behaviors and OCINIC lifecycle helpers.

The new attach_nic/detach_nic logic and OCINIC lifecycle changes significantly alter behavior (vnic_attachment_id handling, is_exist/is_free, and create/delete becoming attach/detach wrappers), but there are no tests covering these flows.

Please add unit tests (with mocked OCI clients) that verify at least:

  • attach_nic calls attach_vnic with the expected CreateVnicDetails (assign_public_ip=False, display_name, freeform tags), waits, sets nic.vnic_attachment_id and nic.id, and returns True.
  • attach_nic returns False when attach_vnic or wait_until raises.
  • detach_nic resolves vnic_attachment_id via list_vnic_attachments when missing, handles the "no attached vnic" case, and on success clears both nic.vnic_attachment_id and nic.id.
  • OCINIC create, delete, get_state, is_exist, and is_free behave correctly when id/vnic_attachment_id are None.

Mock-based unit tests should be sufficient to validate orchestration and state transitions without real OCI infrastructure.

Suggested implementation:

import unittest
from unittest import mock

# Adjust these imports to match the actual module / class names in your codebase.
# They assume that:
# - `OCIInstance` is the instance resource class with `attach_nic` / `detach_nic`.
# - `OCINIC` is the NIC resource class with `create` / `delete` / `get_state` / `is_exist` / `is_free`.
from os_tests.libs import resources_oci


class DummyNIC:
    """Simple stand‑in NIC object for attach_nic/detach_nic orchestration tests."""

    def __init__(self, subnet_id="subnet-ocid", tag="nic-tag"):
        self.subnet_id = subnet_id
        self.tag = tag
        self.vnic_attachment_id = None
        self.id = None


class TestOCINICAttachDetach(unittest.TestCase):
    def setUp(self):
        # Create a fake instance object; it must expose:
        # - id
        # - compute_client
        # - virtual_network_client
        # - attach_nic / detach_nic methods (on the class under test)
        #
        # If your class name differs (e.g. OCIInstance), adjust below.
        self.instance = resources_oci.OCIInstance.__new__(resources_oci.OCIInstance)
        self.instance.id = "instance-ocid"

        self.compute_client = mock.Mock()
        self.virtual_network_client = mock.Mock()

        self.instance.compute_client = self.compute_client
        self.instance.virtual_network_client = self.virtual_network_client

    @mock.patch("os_tests.libs.resources_oci.oci.wait_until")
    @mock.patch("os_tests.libs.resources_oci.CreateVnicDetails")
    @mock.patch("os_tests.libs.resources_oci.AttachVnicDetails")
    def test_attach_nic_success_sets_ids_and_returns_true(
        self,
        mock_attach_vnic_details_cls,
        mock_create_vnic_details_cls,
        mock_wait_until,
    ):
        nic = DummyNIC()

        # Prepare fake response from attach_vnic
        fake_vnic_attachment = mock.Mock()
        fake_vnic_attachment.id = "vnic-attachment-ocid"
        fake_vnic_attachment.vnic_id = "vnic-ocid"

        fake_resp = mock.Mock()
        fake_resp.data = fake_vnic_attachment
        self.compute_client.attach_vnic.return_value = fake_resp

        # The wait_until mock just returns the same response
        mock_wait_until.return_value = fake_resp

        result = self.instance.attach_nic(nic, wait=True, timeout=120)

        # Verify AttachVnicDetails/CreateVnicDetails construction
        mock_create_vnic_details_cls.assert_called_once()
        create_call_kwargs = mock_create_vnic_details_cls.call_args.kwargs
        self.assertEqual(create_call_kwargs["subnet_id"], nic.subnet_id)
        self.assertFalse(create_call_kwargs["assign_public_ip"])
        self.assertEqual(create_call_kwargs["display_name"], nic.tag)
        self.assertEqual(create_call_kwargs["freeform_tags"], {"Name": nic.tag})

        mock_attach_vnic_details_cls.assert_called_once()
        attach_call_kwargs = mock_attach_vnic_details_cls.call_args.kwargs
        self.assertEqual(attach_call_kwargs["instance_id"], self.instance.id)
        self.assertEqual(
            attach_call_kwargs["create_vnic_details"],
            mock_create_vnic_details_cls.return_value,
        )

        self.compute_client.attach_vnic.assert_called_once_with(
            mock_attach_vnic_details_cls.return_value
        )
        mock_wait_until.assert_called_once()

        # State should be updated
        self.assertEqual(nic.vnic_attachment_id, "vnic-attachment-ocid")
        self.assertEqual(nic.id, "vnic-ocid")
        self.assertTrue(result)

    @mock.patch("os_tests.libs.resources_oci.oci.wait_until")
    @mock.patch("os_tests.libs.resources_oci.CreateVnicDetails")
    @mock.patch("os_tests.libs.resources_oci.AttachVnicDetails")
    def test_attach_nic_returns_false_on_error(
        self,
        mock_attach_vnic_details_cls,
        mock_create_vnic_details_cls,
        mock_wait_until,
    ):
        nic = DummyNIC()

        # Simulate an exception during attach_vnic
        self.compute_client.attach_vnic.side_effect = Exception("attach error")

        result = self.instance.attach_nic(nic, wait=True, timeout=120)

        self.assertFalse(result)
        # nic.ids must not be set on failure
        self.assertIsNone(nic.vnic_attachment_id)
        self.assertIsNone(nic.id)

        # Now simulate attach_vnic success but wait_until failing
        self.compute_client.attach_vnic.side_effect = None
        fake_vnic_attachment = mock.Mock()
        fake_vnic_attachment.id = "vnic-attachment-ocid"
        fake_vnic_attachment.vnic_id = "vnic-ocid"
        fake_resp = mock.Mock()
        fake_resp.data = fake_vnic_attachment
        self.compute_client.attach_vnic.return_value = fake_resp

        mock_wait_until.reset_mock()
        mock_wait_until.side_effect = Exception("wait error")

        result = self.instance.attach_nic(nic, wait=True, timeout=120)
        self.assertFalse(result)
        self.assertIsNone(nic.vnic_attachment_id)
        self.assertIsNone(nic.id)

    def test_detach_nic_resolves_missing_vnic_attachment_and_clears_state(self):
        nic = DummyNIC()
        # NIC has vnic id, but missing vnic_attachment_id
        nic.id = "vnic-ocid"
        nic.vnic_attachment_id = None

        # Mock list_vnic_attachments to return one matching vnic_id
        vnic_attachment = mock.Mock()
        vnic_attachment.id = "vnic-attachment-ocid"
        vnic_attachment.vnic_id = nic.id

        list_resp = mock.Mock()
        list_resp.data = [vnic_attachment]
        self.compute_client.list_vnic_attachments.return_value = list_resp

        # Mock detach_vnic / wait_until flow
        detach_resp = mock.Mock()
        detach_resp.data = vnic_attachment
        self.compute_client.detach_vnic.return_value = detach_resp

        with mock.patch("os_tests.libs.resources_oci.oci.wait_until") as mock_wait:
            mock_wait.return_value = detach_resp
            result = self.instance.detach_nic(nic, wait=True, timeout=120)

        self.assertTrue(result)
        self.compute_client.list_vnic_attachments.assert_called_once()
        self.compute_client.detach_vnic.assert_called_once_with(
            vnic_attachment.id
        )

        # State must be cleared
        self.assertIsNone(nic.vnic_attachment_id)
        self.assertIsNone(nic.id)

    def test_detach_nic_no_attached_vnic(self):
        nic = DummyNIC()
        nic.id = "vnic-ocid"
        nic.vnic_attachment_id = None

        # list_vnic_attachments returns empty list -> no attached vnic
        list_resp = mock.Mock()
        list_resp.data = []
        self.compute_client.list_vnic_attachments.return_value = list_resp

        result = self.instance.detach_nic(nic, wait=True, timeout=120)

        self.assertFalse(result)
        self.compute_client.detach_vnic.assert_not_called()
        # Ensure state isn't changed unexpectedly
        self.assertIsNone(nic.vnic_attachment_id)
        self.assertEqual(nic.id, "vnic-ocid")


class TestOCINICLifecycle(unittest.TestCase):
    def setUp(self):
        # OCINIC is assumed to be the NIC resource class; if the name differs, adjust accordingly.
        self.nic = resources_oci.OCINIC.__new__(resources_oci.OCINIC)
        self.nic.id = None
        self.nic.vnic_attachment_id = None
        self.nic.subnet_id = "subnet-ocid"
        self.nic.tag = "nic-tag"

        # OCINIC.create/delete are assumed to delegate to an instance object's attach_nic/detach_nic.
        self.instance = mock.Mock()
        self.nic.instance = self.instance

    def test_is_exist_and_is_free_with_no_ids(self):
        self.nic.id = None
        self.nic.vnic_attachment_id = None

        self.assertFalse(self.nic.is_exist())
        self.assertTrue(self.nic.is_free())

    def test_is_exist_and_is_free_with_ids(self):
        self.nic.id = "vnic-ocid"
        self.nic.vnic_attachment_id = "vnic-attachment-ocid"

        self.assertTrue(self.nic.is_exist())
        self.assertFalse(self.nic.is_free())

    def test_create_calls_attach_nic_and_updates_state(self):
        self.instance.attach_nic.return_value = True

        result = self.nic.create()

        self.assertTrue(result)
        self.instance.attach_nic.assert_called_once_with(self.nic, wait=True)

    def test_create_propagates_failure(self):
        self.instance.attach_nic.return_value = False

        result = self.nic.create()

        self.assertFalse(result)
        self.instance.attach_nic.assert_called_once_with(self.nic, wait=True)

    def test_delete_calls_detach_nic_and_clears_state(self):
        self.nic.id = "vnic-ocid"
        self.nic.vnic_attachment_id = "vnic-attachment-ocid"

        self.instance.detach_nic.return_value = True

        result = self.nic.delete()

        self.assertTrue(result)
        self.instance.detach_nic.assert_called_once_with(self.nic, wait=True)
        self.assertIsNone(self.nic.id)
        self.assertIsNone(self.nic.vnic_attachment_id)

    def test_get_state_reflects_ids(self):
        # The exact semantics of get_state may differ; this test assumes:
        # - when both id and vnic_attachment_id are None => "free" or similar
        # - when both are set => "in-use" or similar
        #
        # Adjust the expected values to match your implementation.
        self.nic.id = None
        self.nic.vnic_attachment_id = None
        state_free = self.nic.get_state()
        self.assertIn(state_free.lower(), ("free", "available", "detached"))

        self.nic.id = "vnic-ocid"
        self.nic.vnic_attachment_id = "vnic-attachment-ocid"
        state_used = self.nic.get_state()
        self.assertIn(state_used.lower(), ("in-use", "attached", "busy"))


if __name__ == "__main__":
    unittest.main()

The proposed tests assume specific API shapes that you may need to align with your actual implementation:

  1. Class names and locations:

    • Replace resources_oci.OCIInstance with the real instance class that implements attach_nic / detach_nic.
    • Replace resources_oci.OCINIC with the actual NIC resource class that exposes create, delete, get_state, is_exist, and is_free.
  2. Imported OCI symbols:

    • The tests patch os_tests.libs.resources_oci.AttachVnicDetails, CreateVnicDetails, and oci.wait_until. Ensure resources_oci.py does from oci.core.models import AttachVnicDetails, CreateVnicDetails and imports oci at module scope so these patch targets are correct. If the imports are local or aliased, adjust the patch paths accordingly.
  3. OCINIC implementation details:

    • The tests assume:
      • OCINIC.create() delegates to self.instance.attach_nic(self, wait=True) and returns its boolean result.
      • OCINIC.delete() delegates to self.instance.detach_nic(self, wait=True), clears id and vnic_attachment_id on success, and returns the boolean result.
      • OCINIC.is_exist() returns True when id or vnic_attachment_id is set; False otherwise.
      • OCINIC.is_free() returns the logical inverse of is_exist().
      • OCINIC.get_state() returns different strings depending on whether id / vnic_attachment_id are None. Update the expected strings in the tests if your implementation uses different labels.
  4. detach_nic signature:

    • The tests call detach_nic(self, nic, wait=True, timeout=120) and expect it to:
      • Use nic.vnic_attachment_id when present, otherwise resolve it via compute_client.list_vnic_attachments.
      • Return False and avoid calling detach_vnic when no vnic attachment is found.
      • On success, clear both nic.vnic_attachment_id and nic.id.
    • If your implementation differs, adjust the assertions to match the final behavior.

Once you align the names and any minor semantic differences, these tests will give coverage for the new attach/detach orchestration and OCINIC lifecycle behavior described in your review comment.

freeform_tags={'Name': nic.tag}
)
)
response = self.compute_client.attach_vnic(attach_vnic_details)
nic.vnic_attachment_id = response.data.id
if wait:
get_response = oci.wait_until(
self.compute_client,
self.compute_client.get_vnic_attachment(nic.vnic_attachment_id),
'lifecycle_state',
'ATTACHED',
max_wait_seconds=timeout
)
vnic_attachment = get_response.data
nic.id = vnic_attachment.vnic_id
LOG.info("nic attached, vnic_id: {}, attachment_id: {}".format(nic.id, nic.vnic_attachment_id))
return True
except Exception as err:
LOG.error("Failed to attach nic: {}".format(err))
return False

def detach_nic(self, nic, wait=True, force=False):
raise NotImplementedError
try:
if not nic.vnic_attachment_id:
LOG.info("No vnic_attachment_id found, searching...")
vnic_attachments = self.compute_client.list_vnic_attachments(
compartment_id=self.compartment_id,
instance_id=self.id
).data
for va in vnic_attachments:
if va.vnic_id == nic.id and va.lifecycle_state == 'ATTACHED':
nic.vnic_attachment_id = va.id
break
if not nic.vnic_attachment_id:
LOG.info("No attached vnic found to detach")
return False
LOG.info("try to detach nic {} from {}".format(nic.vnic_attachment_id, self.id))
self.compute_client.detach_vnic(nic.vnic_attachment_id)
if wait:
oci.wait_until(
self.compute_client,
self.compute_client.get_vnic_attachment(nic.vnic_attachment_id),
'lifecycle_state',
'DETACHED',
max_wait_seconds=120,
succeed_on_not_found=True
)
nic.vnic_attachment_id = None
nic.id = None
LOG.info("nic detached successfully")
return True
except Exception as err:
LOG.error("Failed to detach nic: {}".format(err))
return False

def is_paused(self):
raise NotImplementedError
Expand Down Expand Up @@ -524,11 +583,20 @@ def __init__(self, params):
self.type = None

def is_free(self):
self.volume = self.blockstorage_client.get_volume(self.id).data
if self.volume.lifecycle_state == 'AVAILABLE':
try:
volume_attachments = self.compute_client.list_volume_attachments(
compartment_id=self.compartment_id,
volume_id=self.id
).data
for va in volume_attachments:
if va.lifecycle_state in ['ATTACHING', 'ATTACHED']:
LOG.info("{} volume is attached (state: {})".format(self.id, va.lifecycle_state))
return False
Comment on lines 585 to +594
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add tests for the new OCIVolume is_free behavior based on volume attachments instead of volume lifecycle state.

This is a significant behavior change that can affect volume deletion/reuse, so it should be covered by unit tests (with a mocked compute_client). Please add tests for at least:

  • No attachments ⇒ is_free() returns True.
  • An attachment in ATTACHING or ATTACHEDFalse.
  • Attachments only in non-attached states (e.g. DETACHED, DETACHING) ⇒ confirm and assert the intended result.
  • list_volume_attachments raising an exception ⇒ logs and returns True.

That will make the edge-case behavior (errors, multiple attachments, transient states) explicit and protect against regressions.

Suggested implementation:

import types

import pytest

from os_tests.libs.resources_oci import OCIVolume


class FakeVolumeAttachmentsResponse:
    def __init__(self, attachments):
        # OCI SDK returns an object with a `.data` attribute that is an iterable of attachments
        self.data = attachments


class FakeAttachment:
    def __init__(self, lifecycle_state):
        self.lifecycle_state = lifecycle_state


class FakeComputeClient:
    def __init__(self, attachments=None, side_effect=None):
        """
        :param attachments: iterable of FakeAttachment, returned when list_volume_attachments is called
        :param side_effect: exception to raise when list_volume_attachments is called
        """
        self._attachments = attachments or []
        self._side_effect = side_effect
        self.list_volume_attachments_calls = []

    def list_volume_attachments(self, compartment_id, volume_id):
        # Capture calls so we can assert the method is invoked with expected parameters
        self.list_volume_attachments_calls.append(
            {"compartment_id": compartment_id, "volume_id": volume_id}
        )
        if self._side_effect:
            raise self._side_effect
        return FakeVolumeAttachmentsResponse(self._attachments)


def _make_volume_with_compute_client(compute_client, compartment_id="ocid1.compartment.oc1..test", volume_id="ocid1.volume.oc1..test"):
    """
    Create an OCIVolume instance bypassing its __init__ so tests do not depend on
    the constructor signature.
    """
    vol = OCIVolume.__new__(OCIVolume)  # type: ignore
    vol.compute_client = compute_client
    vol.compartment_id = compartment_id
    vol.id = volume_id
    return vol


def test_is_free_returns_true_when_no_attachments(monkeypatch, caplog):
    compute_client = FakeComputeClient(attachments=[])
    volume = _make_volume_with_compute_client(compute_client)

    with caplog.at_level("INFO"):
        assert volume.is_free() is True

    # Ensure the compute_client was used to query attachments
    assert compute_client.list_volume_attachments_calls == [
        {"compartment_id": volume.compartment_id, "volume_id": volume.id}
    ]
    # Optional: confirm log message indicating volume is free
    assert any("volume is free" in msg for msg in caplog.messages)


@pytest.mark.parametrize("state", ["ATTACHING", "ATTACHED"])
def test_is_free_returns_false_when_attachment_in_attaching_or_attached(monkeypatch, caplog, state):
    attachments = [FakeAttachment(lifecycle_state=state)]
    compute_client = FakeComputeClient(attachments=attachments)
    volume = _make_volume_with_compute_client(compute_client)

    with caplog.at_level("INFO"):
        assert volume.is_free() is False

    assert compute_client.list_volume_attachments_calls == [
        {"compartment_id": volume.compartment_id, "volume_id": volume.id}
    ]
    # Confirm we logged that the volume is attached
    assert any("volume is attached" in msg for msg in caplog.messages)


def test_is_free_true_when_attachments_only_in_non_attached_states(monkeypatch, caplog):
    # States such as DETACHED and DETACHING should *not* be considered attached by the current logic
    attachments = [
        FakeAttachment(lifecycle_state="DETACHED"),
        FakeAttachment(lifecycle_state="DETACHING"),
    ]
    compute_client = FakeComputeClient(attachments=attachments)
    volume = _make_volume_with_compute_client(compute_client)

    with caplog.at_level("INFO"):
        assert volume.is_free() is True

    assert compute_client.list_volume_attachments_calls == [
        {"compartment_id": volume.compartment_id, "volume_id": volume.id}
    ]
    # Confirm we log the volume as free
    assert any("volume is free" in msg for msg in caplog.messages)


def test_is_free_true_when_list_volume_attachments_raises(monkeypatch, caplog):
    error = RuntimeError("simulated failure")
    compute_client = FakeComputeClient(side_effect=error)
    volume = _make_volume_with_compute_client(compute_client)

    with caplog.at_level("INFO"):
        assert volume.is_free() is True

    assert compute_client.list_volume_attachments_calls == [
        {"compartment_id": volume.compartment_id, "volume_id": volume.id}
    ]
    # Ensure the failure is logged
    assert any("Failed to check volume attachment" in msg for msg in caplog.messages)
  1. If your test suite uses a different directory layout (e.g. tests/unit/ instead of os_tests/tests/), adjust the file_path accordingly.
  2. If pytest is not the test runner in this project, you may need to adapt the test style (e.g. to unittest) and the logging assertions (caplog) to match the existing test framework.
  3. Ensure os_tests is importable in the test environment (it may require configuring PYTHONPATH or using your project’s standard test bootstrap).

LOG.info("{} volume is free".format(self.id))
return True
except Exception as err:
LOG.info("Failed to check volume attachment: {}".format(err))
return True
LOG.info("{} volume is in {} state".format(self.id, self.volume.lifecycle_state))
return False

def create(self, wait=True):
try:
Expand Down Expand Up @@ -625,8 +693,11 @@ def modify_disk_size(self, os_disk_size=10, expand_num=10):
class OCINIC(NetworkResource):
'''
OCI Network class
On OCI, secondary VNICs are created and attached in one API call
(attach_vnic) and detached and deleted in one call (detach_vnic).
So create() just marks the NIC as ready, and delete() is a no-op
since detach_vnic already removes the VNIC.
'''
_vnic = None

def __init__(self, params):
super(OCINIC, self).__init__(params)
Expand All @@ -639,32 +710,39 @@ def __init__(self, params):

self.compartment_id = params.get('compartment_id')
self.subnet_id = params.get('subnet_id') or params.get('subnet_id_ipv4')
self.tag = params.get('tagname') or 'os_tests_network_oci'
self.tag = params.get('tagname') or 'os_tests_nic_oci'
self.id = None
self.vnic_attachment_id = None

def show(self):
if self.is_exist():
LOG.info("VNIC ID: {}".format(self.id))

def create(self):
LOG.info("VNIC creation is handled during instance launch on OCI")
raise NotImplementedError
LOG.info("OCINIC created (will be provisioned on attach_nic)")
return True

def delete(self, wait=True):
LOG.info("VNIC deletion is handled during instance termination on OCI")
raise NotImplementedError
LOG.info("OCINIC delete (already handled by detach_vnic)")
self.id = None
self.vnic_attachment_id = None
return True

def get_state(self):
if not self.id:
return 'unknown'
try:
state = 'unknown'
vnic = self.network_client.get_vnic(self.id).data
state = vnic.lifecycle_state
LOG.info("vnic state: {}".format(state))
except Exception as err:
return state
return state
except Exception as err:
LOG.info("Failed to get vnic state: {}".format(err))
return 'unknown'

def is_exist(self):
if not self.id:
return False
try:
vnic = self.network_client.get_vnic(self.id).data
if vnic.lifecycle_state in ['TERMINATING', 'TERMINATED']:
Expand All @@ -677,10 +755,15 @@ def is_exist(self):
return False

def is_free(self):
if not self.id:
return True
if not self.vnic_attachment_id:
return True
try:
vnic = self.network_client.get_vnic(self.id).data
if vnic.lifecycle_state == 'AVAILABLE':
return True
except Exception as err:
LOG.info("Failed to check VNIC state: {}".format(err))
return False
va = self.compute_client.get_vnic_attachment(self.vnic_attachment_id).data
if va.lifecycle_state in ['ATTACHED', 'ATTACHING']:
LOG.info("{} nic is in use (state: {})".format(self.id, va.lifecycle_state))
return False
except Exception:
pass
return True
3 changes: 2 additions & 1 deletion os_tests/libs/utils_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,10 @@ def init_provider(params=None):
if not net.exists():
net.create()
if 'oci' in provider:
from .resources_oci import OCIVM,OCIVolume
from .resources_oci import OCIVM,OCIVolume,OCINIC
vms.extend([OCIVM(params),OCIVM(params)])
disks.append(OCIVolume(params))
nics.extend([OCINIC(params),OCINIC(params),OCINIC(params),OCINIC(params)])
if params.get('instance_ids'):
vms[0].id = params.get('instance_ids').split(',')[0]
if ',' in params.get('instance_ids'):
Expand Down
25 changes: 14 additions & 11 deletions os_tests/tests/test_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -1111,17 +1111,20 @@ def test_stop_start_vm(self):
self.assertTrue(self.vm.is_stopped(),
"Stop VM error: VM status is not SHUTOFF")
self._start_vm_and_check()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider making the OCI-specific behavior explicit in the test semantics (skip/xfail or separate test) so the intent is clearer.

Right now this test no longer exercises the guest-initiated shutdown path on OCI (it just logs and returns), but the name and structure still imply that shutdown behavior is being validated for all providers. To avoid this mismatch, either mark it skipped/xfail on OCI, or split it into provider-specific tests (e.g. a test_stop_start_vm_guest_shutdown with a conditional skip for OCI) so the verified behavior per provider is unambiguous.

Suggested implementation:

        if self.vm.provider == 'oci':
            pytest.skip("Guest-initiated shutdown does not auto-stop OCI instances; skipping shutdown behavior validation on this provider")
        else:
            utils_lib.run_cmd(self, 'sudo shutdown now')
            for count in utils_lib.iterate_timeout(180,
                                                   "Timed out waiting for VM to stop.",
                                                   wait=0):
                time.sleep(30)
                self.log.info(f"The {count} time 30 second waiting\n"
                              f"The vm status is {self.vm.get_state()}")
                if self.vm.is_stopped():
                    break
            time.sleep(120)
            self._start_vm_and_check()
  1. Ensure pytest is imported at the top of os_tests/tests/test_lifecycle.py, e.g. add import pytest alongside the other imports if it is not already present.
  2. Optionally, you may want to reflect this behavior in the test name or docstring (if present) to note that OCI is explicitly skipped because guest-initiated shutdown semantics differ.

utils_lib.run_cmd(self, 'sudo shutdown now')
for count in utils_lib.iterate_timeout(180,
"Timed out waiting for VM to stop.",
wait=0):
time.sleep(30)
self.log.info(f"The {count} time 30 second waiting\n"
f"The vm status is {self.vm.get_state()}")
if self.vm.is_stopped():
break
time.sleep(120)
self._start_vm_and_check()
if self.vm.provider == 'oci':
self.log.info("Skip 'sudo shutdown now' test on OCI as guest-initiated shutdown does not auto-stop the instance")
else:
utils_lib.run_cmd(self, 'sudo shutdown now')
for count in utils_lib.iterate_timeout(180,
"Timed out waiting for VM to stop.",
wait=0):
time.sleep(30)
self.log.info(f"The {count} time 30 second waiting\n"
f"The vm status is {self.vm.get_state()}")
if self.vm.is_stopped():
break
time.sleep(120)
self._start_vm_and_check()

def _update_kernel_args(self, boot_param_required):
if utils_lib.is_ostree_system(self):
Expand Down
Loading