diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 3c11174f..d22dca8d 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -26,10 +26,10 @@ jobs: - name: Check out code uses: actions/checkout@v4 - - name: Set up Python 3.10.14 + - name: Set up Python 3.11.9 uses: actions/setup-python@v5 with: - python-version: 3.10.14 + python-version: 3.11.9 - name: Install ansible-base (v${{ matrix.ansible }}) run: pip install https://github.com/ansible/ansible/archive/v${{ matrix.ansible }}.tar.gz --disable-pip-version-check @@ -51,12 +51,12 @@ jobs: strategy: matrix: ansible: [2.14.15, 2.15.10, 2.16.5, 2.17.8] - python: ['3.10'] + python: ['3.11'] steps: - - name: Set up Python 3.10.14 + - name: Set up Python 3.11.9 uses: actions/setup-python@v5 with: - python-version: 3.10.14 + python-version: 3.11.9 - name: Upgrade pip run: | @@ -77,3 +77,51 @@ jobs: - name: Run sanity tests run: ansible-test sanity --docker --python ${{matrix.python}} -v --color --truncate 0 working-directory: /home/runner/.ansible/collections/ansible_collections/cisco/nac_dc_vxlan + + unit: + name: Unit (Ⓐ${{ matrix.ansible }}) + needs: + - build + runs-on: ubuntu-latest + strategy: + matrix: + ansible: [2.14.15, 2.15.10, 2.16.5, 2.17.8] + python: ['3.11'] + steps: + - name: Set up Python 3.11.9 + uses: actions/setup-python@v5 + with: + python-version: 3.11.9 + + - name: Upgrade pip + run: | + pip install --upgrade pip + + - name: Install ansible-base (v${{ matrix.ansible }}) + run: pip install https://github.com/ansible/ansible/archive/v${{ matrix.ansible }}.tar.gz --disable-pip-version-check + + - name: Download migrated collection artifacts + uses: actions/download-artifact@v4 + with: + name: collection-${{ matrix.ansible }} + path: .cache/collection-tarballs + + - name: Install iac-validate (v0.2.7) + run: pip install iac-validate==0.2.7 + + - name: Install coverage (v7.9.2) + run: pip install coverage==7.9.2 + + - name: Install pytest (v8.4.1) + run: pip install pytest==8.4.1 + + - name: Install the collection tarball + run: ansible-galaxy collection install .cache/collection-tarballs/*.tar.gz + + - name: Run unit tests + run: coverage run --source=. -m pytest tests/unit/. -vvvv + working-directory: /home/runner/.ansible/collections/ansible_collections/cisco/nac_dc_vxlan + + - name: Generate coverage report + run: coverage report --include="plugins/*" + working-directory: /home/runner/.ansible/collections/ansible_collections/cisco/nac_dc_vxlan diff --git a/plugins/filter/version_compare.py b/plugins/filter/version_compare.py index 8d739ed9..17f12bd8 100644 --- a/plugins/filter/version_compare.py +++ b/plugins/filter/version_compare.py @@ -87,8 +87,10 @@ def version_compare(version1, version2, op): version1 (str): The first version string to compare. version2 (str): The second version string to compare. op (str): The comparison operator as a string. Supported: '==', '!=', '>', '>=', '<', '<='. + Returns: bool: The result of the comparison. + Raises: AnsibleError: If the 'packaging' library is not installed. AnsibleFilterTypeError: If the version arguments are not strings. diff --git a/tests/unit/plugins/filter/__init__.py b/tests/unit/plugins/filter/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/plugins/filter/test_version_compare.py b/tests/unit/plugins/filter/test_version_compare.py new file mode 100644 index 00000000..45915c30 --- /dev/null +++ b/tests/unit/plugins/filter/test_version_compare.py @@ -0,0 +1,433 @@ +# Copyright (c) 2025 Cisco Systems, Inc. and its affiliates +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# +# SPDX-License-Identifier: MIT + +""" +Unit tests for the version_compare filter plugin. +""" + +import sys +import os +import pytest +from unittest.mock import Mock, patch + +from jinja2.runtime import Undefined +from jinja2.exceptions import UndefinedError + +from ansible.errors import AnsibleError, AnsibleFilterError, AnsibleFilterTypeError + +# Import the actual version_compare module +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..', '..')) + +from plugins.filter.version_compare import version_compare, FilterModule + + +class TestVersionCompareFunction: + """Test the version_compare function directly.""" + + def test_version_compare_missing_packaging_library(self): + """Test behavior when packaging library is not available.""" + # Mock the import error scenario + with patch('plugins.filter.version_compare.PACKAGING_LIBRARY_IMPORT_ERROR', ImportError("No module named 'packaging'")): + with pytest.raises(AnsibleError, match="packaging must be installed to use this filter plugin"): + version_compare("1.0.0", "1.0.0", "==") + + def test_version_compare_missing_packaging_library_with_module_reload(self): + """Test the ImportError handling during module import.""" + # This tests the import block and exception handling during module loading + import builtins + import importlib + + # Store the original import function + original_import = builtins.__import__ + + def mock_import(name, *args, **kwargs): + if name == 'packaging.version': + raise ImportError("No module named 'packaging'") + return original_import(name, *args, **kwargs) + + # Mock the import and force module reload + with patch.object(builtins, '__import__', side_effect=mock_import): + # Remove the module from cache to force reimport + if 'plugins.filter.version_compare' in sys.modules: + del sys.modules['plugins.filter.version_compare'] + + # Import the module which should trigger the ImportError handling + import plugins.filter.version_compare as vc_module + + # Verify that the ImportError was captured + assert vc_module.PACKAGING_LIBRARY_IMPORT_ERROR is not None + assert isinstance(vc_module.PACKAGING_LIBRARY_IMPORT_ERROR, ImportError) + + # Test that version_compare raises AnsibleError when packaging is missing + with pytest.raises(AnsibleError, match="packaging must be installed to use this filter plugin"): + vc_module.version_compare("1.0.0", "1.0.0", "==") + + # Clean up - reimport the module normally + if 'plugins.filter.version_compare' in sys.modules: + del sys.modules['plugins.filter.version_compare'] + import plugins.filter.version_compare # Reload normally + + def test_version_compare_equal(self): + """Test equal comparison.""" + assert version_compare("1.0.0", "1.0.0", "==") is True + assert version_compare("1.0.0", "1.0.1", "==") is False + assert version_compare("2.1.0", "2.1.0", "==") is True + + def test_version_compare_not_equal(self): + """Test not equal comparison.""" + assert version_compare("1.0.0", "1.0.1", "!=") is True + assert version_compare("1.0.0", "1.0.0", "!=") is False + assert version_compare("2.1.0", "2.0.0", "!=") is True + + def test_version_compare_greater_than(self): + """Test greater than comparison.""" + assert version_compare("1.0.1", "1.0.0", ">") is True + assert version_compare("1.0.0", "1.0.1", ">") is False + assert version_compare("2.0.0", "1.9.9", ">") is True + assert version_compare("1.0.0", "1.0.0", ">") is False + + def test_version_compare_greater_than_or_equal(self): + """Test greater than or equal comparison.""" + assert version_compare("1.0.1", "1.0.0", ">=") is True + assert version_compare("1.0.0", "1.0.0", ">=") is True + assert version_compare("1.0.0", "1.0.1", ">=") is False + assert version_compare("2.0.0", "1.9.9", ">=") is True + + def test_version_compare_less_than(self): + """Test less than comparison.""" + assert version_compare("1.0.0", "1.0.1", "<") is True + assert version_compare("1.0.1", "1.0.0", "<") is False + assert version_compare("1.9.9", "2.0.0", "<") is True + assert version_compare("1.0.0", "1.0.0", "<") is False + + def test_version_compare_less_than_or_equal(self): + """Test less than or equal comparison.""" + assert version_compare("1.0.0", "1.0.1", "<=") is True + assert version_compare("1.0.0", "1.0.0", "<=") is True + assert version_compare("1.0.1", "1.0.0", "<=") is False + assert version_compare("1.9.9", "2.0.0", "<=") is True + + def test_version_compare_complex_versions(self): + """Test with complex version strings.""" + assert version_compare("1.0.0-alpha.1", "1.0.0-alpha.2", "<") is True + assert version_compare("1.0.0-rc.1", "1.0.0", "<") is True + assert version_compare("2.0.0-beta.1", "2.0.0-alpha.1", ">") is True + assert version_compare("1.0.0.dev1", "1.0.0", "<") is True + + def test_version_compare_with_build_metadata(self): + """Test versions with build metadata.""" + # packaging.version treats build metadata as part of the version for equality + result = version_compare("1.0.0+build.1", "1.0.0+build.2", "==") + assert result is False # Different build metadata means not equal + + # But the base version comparison should still work + assert version_compare("1.0.0+build.1", "1.0.1+build.1", "<") is True + + def test_version_compare_different_formats(self): + """Test different version formats.""" + assert version_compare("1.0", "1.0.0", "==") is True + assert version_compare("1", "1.0.0", "==") is True + assert version_compare("1.0.0.0", "1.0.0", "==") is True + + def test_version_compare_version1_type_error(self): + """Test version1 type validation.""" + with pytest.raises(AnsibleFilterTypeError, match="Can only check string versions, however version1 is"): + version_compare(123, "1.0.0", "==") + + with pytest.raises(AnsibleFilterTypeError, match="Can only check string versions, however version1 is"): + version_compare([], "1.0.0", "==") + + with pytest.raises(AnsibleFilterTypeError, match="Can only check string versions, however version1 is"): + version_compare({"version": "1.0.0"}, "1.0.0", "==") + + def test_version_compare_version2_type_error(self): + """Test version2 type validation.""" + with pytest.raises(AnsibleFilterTypeError, match="Can only check string versions, however version2 is"): + version_compare("1.0.0", 123, "==") + + with pytest.raises(AnsibleFilterTypeError, match="Can only check string versions, however version2 is"): + version_compare("1.0.0", [], "==") + + with pytest.raises(AnsibleFilterTypeError, match="Can only check string versions, however version2 is"): + version_compare("1.0.0", {"version": "1.0.0"}, "==") + + def test_version_compare_operator_validation(self): + """Test operator validation.""" + # Test unsupported operator + with pytest.raises(AnsibleFilterError, match="Unsupported operator"): + version_compare("1.0.0", "1.0.0", "===") + + with pytest.raises(AnsibleFilterError, match="Unsupported operator"): + version_compare("1.0.0", "1.0.0", "~=") + + def test_version_compare_operator_type_and_content_validation(self): + """Test operator validation with type and content checks.""" + # Test with integer operator (not string_types or Undefined) + with pytest.raises(AnsibleFilterError, match="Unsupported operator"): + version_compare("1.0.0", "1.0.0", 123) + + # Test with list operator (not string_types or Undefined) + with pytest.raises(AnsibleFilterError, match="Unsupported operator"): + version_compare("1.0.0", "1.0.0", []) + + # Test with dict operator (not string_types or Undefined) + with pytest.raises(AnsibleFilterError, match="Unsupported operator"): + version_compare("1.0.0", "1.0.0", {}) + + def test_version_compare_undefined_version1(self): + """Test with undefined version1.""" + undefined_mock = Mock(spec=Undefined) + + # Test that it handles Undefined properly by raising AnsibleFilterError + with pytest.raises(AnsibleFilterError): + version_compare(undefined_mock, "1.0.0", "==") + + def test_version_compare_undefined_version2(self): + """Test with undefined version2.""" + undefined_mock = Mock(spec=Undefined) + + # Test that it handles Undefined properly by raising AnsibleFilterError + with pytest.raises(AnsibleFilterError): + version_compare("1.0.0", undefined_mock, "==") + + def test_version_compare_undefined_operator(self): + """Test with undefined operator.""" + undefined_mock = Mock(spec=Undefined) + + # Test that it handles string-like Undefined properly + with pytest.raises(AnsibleFilterError): + version_compare("1.0.0", "1.0.0", undefined_mock) + + def test_version_compare_invalid_version_strings(self): + """Test with invalid version strings.""" + with pytest.raises(AnsibleFilterError, match="Unable handle version"): + version_compare("invalid-version", "1.0.0", "==") + + with pytest.raises(AnsibleFilterError, match="Unable handle version"): + version_compare("1.0.0", "invalid-version", "==") + + def test_version_compare_empty_version_strings(self): + """Test with empty version strings.""" + with pytest.raises(AnsibleFilterError, match="Unable handle version"): + version_compare("", "1.0.0", "==") + + with pytest.raises(AnsibleFilterError, match="Unable handle version"): + version_compare("1.0.0", "", "==") + + def test_version_compare_undefined_error_handling(self): + """Test UndefinedError handling.""" + # This test is covered by the test_version_compare_undefined_error_during_version_creation test + # which properly tests the UndefinedError handling in the version_compare function + pass + + def test_version_compare_undefined_error_during_version_creation(self): + """Test UndefinedError specifically during Version() creation.""" + # Import at module level to get the right reference + import plugins.filter.version_compare as vc_module + + # Mock the Version constructor at the module level + with patch.object(vc_module, 'Version') as mock_version: + # Make Version constructor raise UndefinedError + mock_version.side_effect = UndefinedError("Variable is undefined") + + # This should trigger the UndefinedError catch block which re-raises it + with pytest.raises(UndefinedError): + vc_module.version_compare("1.0.0", "1.0.0", "==") + + def test_version_compare_all_operators(self): + """Test all supported operators comprehensively.""" + operators = ['==', '!=', '>', '>=', '<', '<='] + + for op in operators: + # Test that each operator works without exception + result = version_compare("1.0.0", "1.0.0", op) + assert isinstance(result, bool) + + def test_version_compare_edge_cases(self): + """Test edge cases and boundary conditions.""" + # Test with very large version numbers + assert version_compare("999.999.999", "1.0.0", ">") is True + + # Test with zero versions + assert version_compare("0.0.0", "0.0.1", "<") is True + + # Test with single digit versions + assert version_compare("1", "2", "<") is True + + # Test with different number of version parts + assert version_compare("1.0", "1.0.0.0", "==") is True + + def test_version_compare_string_types_compatibility(self): + """Test compatibility with different string types.""" + # Test with unicode strings + assert version_compare("1.0.0", "1.0.0", "==") is True + + # Test with bytes (should fail appropriately) + with pytest.raises(AnsibleFilterTypeError): + version_compare(b"1.0.0", "1.0.0", "==") + + +class TestFilterModule: + """Test the FilterModule class.""" + + def test_filter_module_instantiation(self): + """Test FilterModule can be instantiated.""" + filter_module = FilterModule() + assert filter_module is not None + + def test_filter_module_filters_method(self): + """Test filters method returns correct dictionary.""" + filter_module = FilterModule() + filters = filter_module.filters() + + assert isinstance(filters, dict) + assert "version_compare" in filters + assert filters["version_compare"] is version_compare + + def test_filter_module_integration(self): + """Test filter module integration with Ansible.""" + filter_module = FilterModule() + filters = filter_module.filters() + + # Test that the filter function works through the module + version_compare_func = filters["version_compare"] + result = version_compare_func("1.0.1", "1.0.0", ">") + assert result is True + + def test_filter_module_with_templar(self): + """Test integration with Ansible Templar.""" + filter_module = FilterModule() + filters = filter_module.filters() + + # Mock a basic templar environment + mock_templar = Mock() + mock_templar.environment = Mock() + mock_templar.environment.filters = {} + + # Add our filter to the mock environment + mock_templar.environment.filters.update(filters) + + # Verify the filter is available + assert "version_compare" in mock_templar.environment.filters + + def test_filter_module_error_propagation(self): + """Test that errors are properly propagated through the filter module.""" + filter_module = FilterModule() + filters = filter_module.filters() + version_compare_func = filters["version_compare"] + + # Test that errors from the underlying function are propagated + with pytest.raises(AnsibleFilterTypeError): + version_compare_func(123, "1.0.0", "==") + + with pytest.raises(AnsibleFilterError): + version_compare_func("1.0.0", "1.0.0", "invalid_op") + + +class TestVersionCompareIntegration: + """Integration tests for version_compare filter.""" + + def test_version_compare_realistic_scenarios(self): + """Test realistic version comparison scenarios.""" + # Common software version comparisons + assert version_compare("12.2.2", "12.2.1", ">") is True + assert version_compare("12.2.2", "12.2.2", ">=") is True + assert version_compare("11.5.1", "12.0.0", "<") is True + + # NDFC version comparisons (from examples in docstring) + assert version_compare("12.2.2", "12.2.2", ">=") is True + assert version_compare("12.2.3", "12.2.2", ">=") is True + assert version_compare("12.2.1", "12.2.2", ">=") is False + + def test_version_compare_with_ansible_context(self): + """Test version_compare in a context similar to Ansible usage.""" + # Simulate Ansible variable context + version_vars = { + "current_version": "1.0.2", + "required_version": "1.0.1", + "comparison_op": ">" + } + + result = version_compare( + version_vars["current_version"], + version_vars["required_version"], + version_vars["comparison_op"] + ) + assert result is True + + def test_version_compare_error_messages(self): + """Test that error messages are informative.""" + # Test version1 type error message + try: + version_compare(123, "1.0.0", "==") + assert False, "Should have raised AnsibleFilterTypeError" + except AnsibleFilterTypeError as e: + assert "version1" in str(e) + assert "int" in str(e) + + # Test version2 type error message + try: + version_compare("1.0.0", 123, "==") + assert False, "Should have raised AnsibleFilterTypeError" + except AnsibleFilterTypeError as e: + assert "version2" in str(e) + assert "int" in str(e) + + # Test operator error message + try: + version_compare("1.0.0", "1.0.0", "invalid") + assert False, "Should have raised AnsibleFilterError" + except AnsibleFilterError as e: + assert "Unsupported operator" in str(e) + assert "invalid" in str(e) + + def test_version_compare_performance(self): + """Test performance with multiple version comparisons.""" + import time + + # Test that version comparisons are reasonably fast + start_time = time.time() + + for i in range(100): + version_compare(f"1.0.{i}", f"1.0.{i+1}", "<") + + elapsed_time = time.time() - start_time + assert elapsed_time < 1.0, f"Version comparisons took too long: {elapsed_time}s" + + def test_version_compare_memory_usage(self): + """Test that version comparisons don't cause memory issues.""" + # Test that we can do many comparisons without memory issues + for i in range(1000): + result = version_compare("1.0.0", "1.0.1", "<") + assert result is True + + def test_version_compare_documentation_examples(self): + """Test examples from the documentation.""" + # From EXAMPLES section + assert version_compare('1.0.2', '1.0.1', '>') is True + + # Test the conditional example scenario + ndfc_version = "12.2.2" + assert version_compare(ndfc_version, '12.2.2', '>=') is True + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/tests/unit/plugins/plugin_utils/__init__.py b/tests/unit/plugins/plugin_utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/plugins/plugin_utils/test_data_model_keys.py b/tests/unit/plugins/plugin_utils/test_data_model_keys.py new file mode 100644 index 00000000..0498312f --- /dev/null +++ b/tests/unit/plugins/plugin_utils/test_data_model_keys.py @@ -0,0 +1,595 @@ +# Copyright (c) 2025 Cisco Systems, Inc. and its affiliates +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# +# SPDX-License-Identifier: MIT + +""" +Unit tests for the data_model_keys plugin_utils module. +""" + +import sys +import os +import pytest + +# Add plugin_utils to Python path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..', 'plugins', 'plugin_utils')) + +try: + from data_model_keys import root_key, model_keys +except ImportError: + # Alternative import path for testing + import importlib.util + spec = importlib.util.spec_from_file_location( + "data_model_keys", + os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', 'plugins', 'plugin_utils', 'data_model_keys.py') + ) + data_model_keys_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(data_model_keys_module) + + root_key = data_model_keys_module.root_key + model_keys = data_model_keys_module.model_keys + + +class TestRootKey: + """Test the root_key constant.""" + + def test_root_key_value(self): + """Test that root_key is set to 'vxlan'.""" + assert root_key == 'vxlan' + + def test_root_key_type(self): + """Test that root_key is a string.""" + assert isinstance(root_key, str) + + +class TestModelKeysStructure: + """Test the overall structure of model_keys.""" + + def test_model_keys_is_dict(self): + """Test that model_keys is a dictionary.""" + assert isinstance(model_keys, dict) + + def test_model_keys_fabric_types(self): + """Test that model_keys contains all expected fabric types.""" + expected_fabric_types = ['VXLAN_EVPN', 'MSD', 'MCF', 'ISN', 'External'] + + for fabric_type in expected_fabric_types: + assert fabric_type in model_keys + assert isinstance(model_keys[fabric_type], dict) + + def test_model_keys_fabric_types_count(self): + """Test that model_keys contains exactly the expected number of fabric types.""" + assert len(model_keys) == 5 + + def test_model_keys_all_values_are_dicts(self): + """Test that all top-level values in model_keys are dictionaries.""" + for fabric_type, fabric_keys in model_keys.items(): + assert isinstance(fabric_keys, dict), f"model_keys['{fabric_type}'] should be a dict" + + +class TestVxlanEvpnKeys: + """Test the VXLAN_EVPN fabric type keys.""" + + def test_vxlan_evpn_global_keys(self): + """Test VXLAN_EVPN global keys.""" + vxlan_evpn = model_keys['VXLAN_EVPN'] + + # Test basic global key + assert 'global' in vxlan_evpn + assert vxlan_evpn['global'] == [root_key, 'global', 'KEY'] + + # Test global subkeys + assert 'global.dns_servers' in vxlan_evpn + assert vxlan_evpn['global.dns_servers'] == [root_key, 'global', 'dns_servers', 'LIST'] + + assert 'global.ntp_servers' in vxlan_evpn + assert vxlan_evpn['global.ntp_servers'] == [root_key, 'global', 'ntp_servers', 'LIST'] + + assert 'global.syslog_servers' in vxlan_evpn + assert vxlan_evpn['global.syslog_servers'] == [root_key, 'global', 'syslog_servers', 'LIST'] + + def test_vxlan_evpn_netflow_keys(self): + """Test VXLAN_EVPN netflow keys.""" + vxlan_evpn = model_keys['VXLAN_EVPN'] + + assert 'global.netflow' in vxlan_evpn + assert vxlan_evpn['global.netflow'] == [root_key, 'global', 'netflow', 'KEY'] + + assert 'global.netflow.exporter' in vxlan_evpn + assert vxlan_evpn['global.netflow.exporter'] == [root_key, 'global', 'netflow', 'exporter', 'LIST'] + + assert 'global.netflow.record' in vxlan_evpn + assert vxlan_evpn['global.netflow.record'] == [root_key, 'global', 'netflow', 'record', 'LIST'] + + assert 'global.netflow.monitor' in vxlan_evpn + assert vxlan_evpn['global.netflow.monitor'] == [root_key, 'global', 'netflow', 'monitor', 'LIST'] + + def test_vxlan_evpn_spanning_tree_keys(self): + """Test VXLAN_EVPN spanning tree keys.""" + vxlan_evpn = model_keys['VXLAN_EVPN'] + + assert 'global.spanning_tree' in vxlan_evpn + assert vxlan_evpn['global.spanning_tree'] == [root_key, 'global', 'spanning_tree', 'KEY'] + + def test_vxlan_evpn_underlay_keys(self): + """Test VXLAN_EVPN underlay keys.""" + vxlan_evpn = model_keys['VXLAN_EVPN'] + + assert 'underlay' in vxlan_evpn + assert vxlan_evpn['underlay'] == [root_key, 'underlay', 'KEY'] + + def test_vxlan_evpn_topology_keys(self): + """Test VXLAN_EVPN topology keys.""" + vxlan_evpn = model_keys['VXLAN_EVPN'] + + assert 'topology' in vxlan_evpn + assert vxlan_evpn['topology'] == [root_key, 'topology', 'KEY'] + + assert 'topology.edge_connections' in vxlan_evpn + assert vxlan_evpn['topology.edge_connections'] == [root_key, 'topology', 'edge_connections', 'LIST'] + + assert 'topology.fabric_links' in vxlan_evpn + assert vxlan_evpn['topology.fabric_links'] == [root_key, 'topology', 'fabric_links', 'LIST'] + + assert 'topology.switches' in vxlan_evpn + assert vxlan_evpn['topology.switches'] == [root_key, 'topology', 'switches', 'LIST'] + + assert 'topology.switches.freeform' in vxlan_evpn + assert vxlan_evpn['topology.switches.freeform'] == [root_key, 'topology', 'switches', 'freeform', 'LIST_INDEX'] + + assert 'topology.switches.interfaces' in vxlan_evpn + assert vxlan_evpn['topology.switches.interfaces'] == [root_key, 'topology', 'switches', 'interfaces', 'LIST_INDEX'] + + assert 'topology.vpc_peers' in vxlan_evpn + assert vxlan_evpn['topology.vpc_peers'] == [root_key, 'topology', 'vpc_peers', 'LIST'] + + def test_vxlan_evpn_overlay_keys(self): + """Test VXLAN_EVPN overlay keys.""" + vxlan_evpn = model_keys['VXLAN_EVPN'] + + assert 'overlay' in vxlan_evpn + assert vxlan_evpn['overlay'] == [root_key, 'overlay', 'KEY'] + + assert 'overlay.vrfs' in vxlan_evpn + assert vxlan_evpn['overlay.vrfs'] == [root_key, 'overlay', 'vrfs', 'LIST'] + + assert 'overlay.vrf_attach_groups' in vxlan_evpn + assert vxlan_evpn['overlay.vrf_attach_groups'] == [root_key, 'overlay', 'vrf_attach_groups', 'LIST'] + + assert 'overlay.vrf_attach_groups.switches' in vxlan_evpn + assert vxlan_evpn['overlay.vrf_attach_groups.switches'] == [root_key, 'overlay', 'vrf_attach_groups', 'switches', 'LIST_INDEX'] + + assert 'overlay.networks' in vxlan_evpn + assert vxlan_evpn['overlay.networks'] == [root_key, 'overlay', 'networks', 'LIST'] + + assert 'overlay.network_attach_groups' in vxlan_evpn + assert vxlan_evpn['overlay.network_attach_groups'] == [root_key, 'overlay', 'network_attach_groups', 'LIST'] + + assert 'overlay.network_attach_groups.switches' in vxlan_evpn + assert vxlan_evpn['overlay.network_attach_groups.switches'] == [root_key, 'overlay', 'network_attach_groups', 'switches', 'LIST_INDEX'] + + def test_vxlan_evpn_overlay_extensions_keys(self): + """Test VXLAN_EVPN overlay extensions keys.""" + vxlan_evpn = model_keys['VXLAN_EVPN'] + + assert 'overlay_extensions' in vxlan_evpn + assert vxlan_evpn['overlay_extensions'] == [root_key, 'overlay_extensions', 'KEY'] + + assert 'overlay_extensions.route_control' in vxlan_evpn + assert vxlan_evpn['overlay_extensions.route_control'] == [root_key, 'overlay_extensions', 'route_control', 'KEY'] + + assert 'overlay_extensions.route_control.route_maps' in vxlan_evpn + assert vxlan_evpn['overlay_extensions.route_control.route_maps'] == [root_key, 'overlay_extensions', 'route_control', 'route_maps', 'LIST'] + + def test_vxlan_evpn_policy_keys(self): + """Test VXLAN_EVPN policy keys.""" + vxlan_evpn = model_keys['VXLAN_EVPN'] + + assert 'policy' in vxlan_evpn + assert vxlan_evpn['policy'] == [root_key, 'policy', 'KEY'] + + assert 'policy.policies' in vxlan_evpn + assert vxlan_evpn['policy.policies'] == [root_key, 'policy', 'policies', 'LIST'] + + assert 'policy.groups' in vxlan_evpn + assert vxlan_evpn['policy.groups'] == [root_key, 'policy', 'groups', 'LIST'] + + assert 'policy.switches' in vxlan_evpn + assert vxlan_evpn['policy.switches'] == [root_key, 'policy', 'switches', 'LIST'] + + +class TestIsnKeys: + """Test the ISN fabric type keys.""" + + def test_isn_topology_keys(self): + """Test ISN topology keys.""" + isn = model_keys['ISN'] + + assert 'topology' in isn + assert isn['topology'] == [root_key, 'topology', 'KEY'] + + assert 'topology.edge_connections' in isn + assert isn['topology.edge_connections'] == [root_key, 'topology', 'edge_connections', 'LIST'] + + assert 'topology.fabric_links' in isn + assert isn['topology.fabric_links'] == [root_key, 'topology', 'fabric_links', 'LIST'] + + assert 'topology.switches' in isn + assert isn['topology.switches'] == [root_key, 'topology', 'switches', 'LIST'] + + assert 'topology.switches.freeform' in isn + assert isn['topology.switches.freeform'] == [root_key, 'topology', 'switches', 'freeform', 'LIST_INDEX'] + + assert 'topology.switches.interfaces' in isn + assert isn['topology.switches.interfaces'] == [root_key, 'topology', 'switches', 'interfaces', 'LIST_INDEX'] + + assert 'topology.vpc_peers' in isn + assert isn['topology.vpc_peers'] == [root_key, 'topology', 'vpc_peers', 'LIST'] + + def test_isn_policy_keys(self): + """Test ISN policy keys.""" + isn = model_keys['ISN'] + + assert 'policy' in isn + assert isn['policy'] == [root_key, 'policy', 'KEY'] + + assert 'policy.policies' in isn + assert isn['policy.policies'] == [root_key, 'policy', 'policies', 'LIST'] + + assert 'policy.groups' in isn + assert isn['policy.groups'] == [root_key, 'policy', 'groups', 'LIST'] + + assert 'policy.switches' in isn + assert isn['policy.switches'] == [root_key, 'policy', 'switches', 'LIST'] + + def test_isn_no_global_keys(self): + """Test that ISN doesn't have global keys.""" + isn = model_keys['ISN'] + + # ISN should not have global keys + assert 'global' not in isn + assert 'global.dns_servers' not in isn + + def test_isn_no_overlay_keys(self): + """Test that ISN doesn't have overlay keys.""" + isn = model_keys['ISN'] + + # ISN should not have overlay keys + assert 'overlay' not in isn + assert 'overlay.vrfs' not in isn + + +class TestExternalKeys: + """Test the External fabric type keys.""" + + def test_external_topology_keys(self): + """Test External topology keys.""" + external = model_keys['External'] + + assert 'topology' in external + assert external['topology'] == [root_key, 'topology', 'KEY'] + + assert 'topology.edge_connections' in external + assert external['topology.edge_connections'] == [root_key, 'topology', 'edge_connections', 'LIST'] + + assert 'topology.fabric_links' in external + assert external['topology.fabric_links'] == [root_key, 'topology', 'fabric_links', 'LIST'] + + assert 'topology.switches' in external + assert external['topology.switches'] == [root_key, 'topology', 'switches', 'LIST'] + + assert 'topology.switches.freeform' in external + assert external['topology.switches.freeform'] == [root_key, 'topology', 'switches', 'freeform', 'LIST_INDEX'] + + assert 'topology.switches.interfaces' in external + assert external['topology.switches.interfaces'] == [root_key, 'topology', 'switches', 'interfaces', 'LIST_INDEX'] + + assert 'topology.vpc_peers' in external + assert external['topology.vpc_peers'] == [root_key, 'topology', 'vpc_peers', 'LIST'] + + def test_external_policy_keys(self): + """Test External policy keys.""" + external = model_keys['External'] + + assert 'policy' in external + assert external['policy'] == [root_key, 'policy', 'KEY'] + + assert 'policy.policies' in external + assert external['policy.policies'] == [root_key, 'policy', 'policies', 'LIST'] + + assert 'policy.groups' in external + assert external['policy.groups'] == [root_key, 'policy', 'groups', 'LIST'] + + assert 'policy.switches' in external + assert external['policy.switches'] == [root_key, 'policy', 'switches', 'LIST'] + + def test_external_structure_similar_to_isn(self): + """Test that External structure is similar to ISN.""" + external = model_keys['External'] + isn = model_keys['ISN'] + + # External should have the same keys as ISN + assert set(external.keys()) == set(isn.keys()) + + # Values should be identical + for key in external.keys(): + assert external[key] == isn[key] + + +class TestMsdKeys: + """Test the MSD fabric type keys.""" + + def test_msd_multisite_keys(self): + """Test MSD multisite keys.""" + msd = model_keys['MSD'] + + assert 'multisite' in msd + assert msd['multisite'] == [root_key, 'multisite', 'KEY'] + + assert 'multisite.child_fabrics' in msd + assert msd['multisite.child_fabrics'] == [root_key, 'multisite', 'child_fabrics', 'KEY'] + + def test_msd_multisite_overlay_keys(self): + """Test MSD multisite overlay keys.""" + msd = model_keys['MSD'] + + assert 'multisite.overlay' in msd + assert msd['multisite.overlay'] == [root_key, 'multisite', 'overlay', 'KEY'] + + assert 'multisite.overlay.vrfs' in msd + assert msd['multisite.overlay.vrfs'] == [root_key, 'multisite', 'overlay', 'vrfs', 'LIST'] + + assert 'multisite.overlay.vrf_attach_groups' in msd + assert msd['multisite.overlay.vrf_attach_groups'] == [root_key, 'multisite', 'overlay', 'vrf_attach_groups', 'LIST'] + + assert 'multisite.overlay.vrf_attach_groups.switches' in msd + assert msd['multisite.overlay.vrf_attach_groups.switches'] == [root_key, 'multisite', 'overlay', 'vrf_attach_groups', 'switches', 'LIST_INDEX'] + + assert 'multisite.overlay.networks' in msd + assert msd['multisite.overlay.networks'] == [root_key, 'multisite', 'overlay', 'networks', 'LIST'] + + assert 'multisite.overlay.network_attach_groups' in msd + assert msd['multisite.overlay.network_attach_groups'] == [root_key, 'multisite', 'overlay', 'network_attach_groups', 'LIST'] + + assert 'multisite.overlay.network_attach_groups.switches' in msd + assert msd['multisite.overlay.network_attach_groups.switches'] == [root_key, 'multisite', 'overlay', 'network_attach_groups', 'switches', 'LIST_INDEX'] + + def test_msd_unique_structure(self): + """Test that MSD has unique structure compared to other fabric types.""" + msd = model_keys['MSD'] + + # MSD should have multisite keys that other fabric types don't have + assert 'multisite' in msd + assert 'multisite.child_fabrics' in msd + + # MSD should not have topology or policy keys like other fabric types + assert 'topology' not in msd + assert 'policy' not in msd + + # MSD should not have global keys + assert 'global' not in msd + + +class TestMcfKeys: + """Test the MCF fabric type keys.""" + + def test_mcf_empty_structure(self): + """Test that MCF has empty structure.""" + mcf = model_keys['MCF'] + + # MCF should be empty dictionary + assert mcf == {} + assert len(mcf) == 0 + + def test_mcf_no_common_keys(self): + """Test that MCF doesn't have common keys.""" + mcf = model_keys['MCF'] + + # MCF should not have any of the common keys + assert 'topology' not in mcf + assert 'policy' not in mcf + assert 'global' not in mcf + assert 'overlay' not in mcf + assert 'multisite' not in mcf + + +class TestKeyPatterns: + """Test patterns and consistency in key structures.""" + + def test_key_path_structure(self): + """Test that all key paths follow expected structure.""" + for fabric_type, fabric_keys in model_keys.items(): + for key_name, key_path in fabric_keys.items(): + # All key paths should be lists + assert isinstance(key_path, list), f"Key path for {fabric_type}.{key_name} should be a list" + + # All key paths should start with root_key + assert key_path[0] == root_key, f"Key path for {fabric_type}.{key_name} should start with root_key" + + # All key paths should have a type indicator as the last element + assert key_path[-1] in ['KEY', 'LIST', 'LIST_INDEX'], f"Key path for {fabric_type}.{key_name} should end with a valid type" + + def test_key_type_consistency(self): + """Test that key types are used consistently.""" + key_types = set() + + for fabric_type, fabric_keys in model_keys.items(): + for key_name, key_path in fabric_keys.items(): + key_types.add(key_path[-1]) + + # Should only have these three types + assert key_types == {'KEY', 'LIST', 'LIST_INDEX'} + + def test_dot_notation_consistency(self): + """Test that dot notation is used consistently.""" + for fabric_type, fabric_keys in model_keys.items(): + for key_name, key_path in fabric_keys.items(): + # Count dots in key_name + dot_count = key_name.count('.') + + # Key path length should be dot_count + 2 (root_key + type indicator) + expected_length = dot_count + 3 # root_key + path_parts + type + assert len(key_path) == expected_length, f"Key path length mismatch for {fabric_type}.{key_name}" + + def test_common_topology_structure(self): + """Test that topology structures are consistent across fabric types.""" + topology_fabric_types = ['VXLAN_EVPN', 'ISN', 'External'] + + common_topology_keys = [ + 'topology.edge_connections', + 'topology.fabric_links', + 'topology.switches', + 'topology.switches.freeform', + 'topology.switches.interfaces', + 'topology.vpc_peers' + ] + + for fabric_type in topology_fabric_types: + fabric_keys = model_keys[fabric_type] + + for topology_key in common_topology_keys: + assert topology_key in fabric_keys, f"{fabric_type} should have {topology_key}" + + def test_common_policy_structure(self): + """Test that policy structures are consistent across fabric types.""" + policy_fabric_types = ['VXLAN_EVPN', 'ISN', 'External'] + + common_policy_keys = [ + 'policy.policies', + 'policy.groups', + 'policy.switches' + ] + + for fabric_type in policy_fabric_types: + fabric_keys = model_keys[fabric_type] + + for policy_key in common_policy_keys: + assert policy_key in fabric_keys, f"{fabric_type} should have {policy_key}" + + def test_list_index_usage(self): + """Test that LIST_INDEX is used appropriately.""" + list_index_keys = [] + + for fabric_type, fabric_keys in model_keys.items(): + for key_name, key_path in fabric_keys.items(): + if key_path[-1] == 'LIST_INDEX': + list_index_keys.append(f"{fabric_type}.{key_name}") + + # LIST_INDEX should be used for nested list items + expected_patterns = [ + 'switches.freeform', + 'switches.interfaces', + 'vrf_attach_groups.switches', + 'network_attach_groups.switches' + ] + + for list_index_key in list_index_keys: + # Check that the key contains one of the expected patterns + assert any(pattern in list_index_key for pattern in expected_patterns), \ + f"LIST_INDEX key {list_index_key} should match expected patterns" + + +class TestDataModelKeysIntegration: + """Integration tests for data model keys.""" + + def test_key_path_reconstruction(self): + """Test that key paths can be used to reconstruct data model paths.""" + # Test a few key paths + test_cases = [ + ('VXLAN_EVPN', 'global.dns_servers', ['vxlan', 'global', 'dns_servers']), + ('VXLAN_EVPN', 'topology.switches', ['vxlan', 'topology', 'switches']), + ('MSD', 'multisite.overlay.vrfs', ['vxlan', 'multisite', 'overlay', 'vrfs']), + ] + + for fabric_type, key_name, expected_path in test_cases: + if key_name in model_keys[fabric_type]: + key_path = model_keys[fabric_type][key_name] + actual_path = key_path[:-1] # Remove the type indicator + assert actual_path == expected_path, \ + f"Key path for {fabric_type}.{key_name} should reconstruct to {expected_path}" + + def test_fabric_type_coverage(self): + """Test that all fabric types have appropriate coverage.""" + fabric_coverage = {} + + for fabric_type, fabric_keys in model_keys.items(): + fabric_coverage[fabric_type] = len(fabric_keys) + + # VXLAN_EVPN should have the most keys (it's the most comprehensive) + assert fabric_coverage['VXLAN_EVPN'] > 0 + + # MCF should have no keys (it's empty) + assert fabric_coverage['MCF'] == 0 + + # MSD should have multisite-specific keys + assert fabric_coverage['MSD'] > 0 + + # ISN and External should have similar coverage + assert fabric_coverage['ISN'] > 0 + assert fabric_coverage['External'] > 0 + + def test_key_uniqueness_within_fabric_type(self): + """Test that keys are unique within each fabric type.""" + for fabric_type, fabric_keys in model_keys.items(): + key_names = list(fabric_keys.keys()) + unique_key_names = list(set(key_names)) + + assert len(key_names) == len(unique_key_names), \ + f"Duplicate keys found in {fabric_type}: {set([x for x in key_names if key_names.count(x) > 1])}" + + def test_realistic_key_usage_patterns(self): + """Test realistic patterns for how keys might be used.""" + # Test accessing nested data using key paths + mock_data = { + 'vxlan': { + 'global': { + 'dns_servers': ['8.8.8.8', '8.8.4.4'] + }, + 'topology': { + 'switches': [ + {'name': 'leaf-01', 'role': 'leaf'} + ] + } + } + } + + # Test VXLAN_EVPN global.dns_servers key + key_path = model_keys['VXLAN_EVPN']['global.dns_servers'] + path_without_type = key_path[:-1] # Remove 'LIST' + + # Navigate through the mock data + current_data = mock_data + for path_part in path_without_type: + current_data = current_data[path_part] + + assert current_data == ['8.8.8.8', '8.8.4.4'] + + # Test topology.switches key + key_path = model_keys['VXLAN_EVPN']['topology.switches'] + path_without_type = key_path[:-1] # Remove 'LIST' + + current_data = mock_data + for path_part in path_without_type: + current_data = current_data[path_part] + + assert current_data == [{'name': 'leaf-01', 'role': 'leaf'}] + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/tests/unit/plugins/plugin_utils/test_helper_functions.py b/tests/unit/plugins/plugin_utils/test_helper_functions.py new file mode 100644 index 00000000..49d49047 --- /dev/null +++ b/tests/unit/plugins/plugin_utils/test_helper_functions.py @@ -0,0 +1,917 @@ +# Copyright (c) 2025 Cisco Systems, Inc. and its affiliates +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# +# SPDX-License-Identifier: MIT + +""" +Unit tests for the helper_functions plugin_utils module. +""" + +import sys +import os +import pytest +from unittest.mock import Mock, patch + +# Add plugin_utils to Python path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', 'plugins', 'plugin_utils')) + +try: + from helper_functions import ( + data_model_key_check, + hostname_to_ip_mapping, + ndfc_get_switch_policy, + ndfc_get_switch_policy_using_template, + ndfc_get_switch_policy_using_desc, + ndfc_get_fabric_attributes, + ndfc_get_fabric_switches + ) +except ImportError: + # Alternative import path for testing + import importlib.util + spec = importlib.util.spec_from_file_location( + "helper_functions", + os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', 'plugins', 'plugin_utils', 'helper_functions.py') + ) + helper_functions_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(helper_functions_module) + + data_model_key_check = helper_functions_module.data_model_key_check + hostname_to_ip_mapping = helper_functions_module.hostname_to_ip_mapping + ndfc_get_switch_policy = helper_functions_module.ndfc_get_switch_policy + ndfc_get_switch_policy_using_template = helper_functions_module.ndfc_get_switch_policy_using_template + ndfc_get_switch_policy_using_desc = helper_functions_module.ndfc_get_switch_policy_using_desc + ndfc_get_fabric_attributes = helper_functions_module.ndfc_get_fabric_attributes + ndfc_get_fabric_switches = helper_functions_module.ndfc_get_fabric_switches + + +class TestDataModelKeyCheck: + """Test the data_model_key_check function.""" + + def test_data_model_key_check_all_keys_found_with_data(self): + """Test when all keys are found and contain data.""" + tested_object = { + 'vxlan': { + 'global': { + 'dns_servers': ['8.8.8.8', '8.8.4.4'] + } + } + } + keys = ['vxlan', 'global', 'dns_servers'] + + result = data_model_key_check(tested_object, keys) + + assert result['keys_found'] == ['vxlan', 'global', 'dns_servers'] + assert result['keys_not_found'] == [] + assert result['keys_data'] == ['vxlan', 'global', 'dns_servers'] + assert result['keys_no_data'] == [] + + def test_data_model_key_check_all_keys_found_with_empty_data(self): + """Test when all keys are found but some contain no data.""" + tested_object = { + 'vxlan': { + 'global': { + 'dns_servers': [] + } + } + } + keys = ['vxlan', 'global', 'dns_servers'] + + result = data_model_key_check(tested_object, keys) + + assert result['keys_found'] == ['vxlan', 'global', 'dns_servers'] + assert result['keys_not_found'] == [] + assert result['keys_data'] == ['vxlan', 'global'] + assert result['keys_no_data'] == ['dns_servers'] + + def test_data_model_key_check_some_keys_not_found(self): + """Test when some keys are not found.""" + tested_object = { + 'vxlan': { + 'global': {} + } + } + keys = ['vxlan', 'global', 'dns_servers'] + + result = data_model_key_check(tested_object, keys) + + assert result['keys_found'] == ['vxlan', 'global'] + assert result['keys_not_found'] == ['dns_servers'] + assert result['keys_data'] == ['vxlan'] + assert result['keys_no_data'] == ['global'] + + def test_data_model_key_check_no_keys_found(self): + """Test when no keys are found.""" + tested_object = {} + keys = ['vxlan', 'global', 'dns_servers'] + + result = data_model_key_check(tested_object, keys) + + assert result['keys_found'] == [] + assert result['keys_not_found'] == ['vxlan', 'global', 'dns_servers'] + assert result['keys_data'] == [] + assert result['keys_no_data'] == [] + + def test_data_model_key_check_empty_keys_list(self): + """Test with empty keys list.""" + tested_object = {'vxlan': {'global': {}}} + keys = [] + + result = data_model_key_check(tested_object, keys) + + assert result['keys_found'] == [] + assert result['keys_not_found'] == [] + assert result['keys_data'] == [] + assert result['keys_no_data'] == [] + + def test_data_model_key_check_none_tested_object(self): + """Test with None tested_object.""" + tested_object = None + keys = ['vxlan', 'global'] + + result = data_model_key_check(tested_object, keys) + + assert result['keys_found'] == [] + assert result['keys_not_found'] == ['vxlan', 'global'] + assert result['keys_data'] == [] + assert result['keys_no_data'] == [] + + def test_data_model_key_check_single_key(self): + """Test with a single key.""" + tested_object = {'vxlan': {'data': 'test'}} + keys = ['vxlan'] + + result = data_model_key_check(tested_object, keys) + + assert result['keys_found'] == ['vxlan'] + assert result['keys_not_found'] == [] + assert result['keys_data'] == ['vxlan'] + assert result['keys_no_data'] == [] + + def test_data_model_key_check_nested_empty_objects(self): + """Test with nested empty objects.""" + tested_object = { + 'vxlan': { + 'global': { + 'dns_servers': { + 'primary': None + } + } + } + } + keys = ['vxlan', 'global', 'dns_servers', 'primary'] + + result = data_model_key_check(tested_object, keys) + + assert result['keys_found'] == ['vxlan', 'global', 'dns_servers', 'primary'] + assert result['keys_not_found'] == [] + assert result['keys_data'] == ['vxlan', 'global', 'dns_servers'] + assert result['keys_no_data'] == ['primary'] + + +class TestHostnameToIpMapping: + """Test the hostname_to_ip_mapping function.""" + + def test_hostname_to_ip_mapping_ipv4(self): + """Test hostname to IPv4 mapping.""" + model_data = { + 'vxlan': { + 'topology': { + 'switches': [ + { + 'name': 'leaf-01', + 'management': { + 'management_ipv4_address': '192.168.1.10' + } + }, + { + 'name': 'leaf-02', + 'management': { + 'management_ipv4_address': '192.168.1.11' + } + } + ] + }, + 'policy': { + 'switches': [ + {'name': 'leaf-01'}, + {'name': 'leaf-02'} + ] + } + } + } + + result = hostname_to_ip_mapping(model_data) + + assert result['vxlan']['policy']['switches'][0]['mgmt_ip_address'] == '192.168.1.10' + assert result['vxlan']['policy']['switches'][1]['mgmt_ip_address'] == '192.168.1.11' + + def test_hostname_to_ip_mapping_ipv6(self): + """Test hostname to IPv6 mapping.""" + model_data = { + 'vxlan': { + 'topology': { + 'switches': [ + { + 'name': 'leaf-01', + 'management': { + 'management_ipv6_address': '2001:db8::10' + } + } + ] + }, + 'policy': { + 'switches': [ + {'name': 'leaf-01'} + ] + } + } + } + + result = hostname_to_ip_mapping(model_data) + + assert result['vxlan']['policy']['switches'][0]['mgmt_ip_address'] == '2001:db8::10' + + def test_hostname_to_ip_mapping_ipv4_preferred_over_ipv6(self): + """Test that IPv4 is preferred over IPv6 when both exist.""" + model_data = { + 'vxlan': { + 'topology': { + 'switches': [ + { + 'name': 'leaf-01', + 'management': { + 'management_ipv4_address': '192.168.1.10', + 'management_ipv6_address': '2001:db8::10' + } + } + ] + }, + 'policy': { + 'switches': [ + {'name': 'leaf-01'} + ] + } + } + } + + result = hostname_to_ip_mapping(model_data) + + assert result['vxlan']['policy']['switches'][0]['mgmt_ip_address'] == '192.168.1.10' + + def test_hostname_to_ip_mapping_no_matching_topology_switch(self): + """Test when policy switch doesn't exist in topology.""" + model_data = { + 'vxlan': { + 'topology': { + 'switches': [ + { + 'name': 'leaf-01', + 'management': { + 'management_ipv4_address': '192.168.1.10' + } + } + ] + }, + 'policy': { + 'switches': [ + {'name': 'leaf-02'} + ] + } + } + } + + result = hostname_to_ip_mapping(model_data) + + assert 'mgmt_ip_address' not in result['vxlan']['policy']['switches'][0] + + def test_hostname_to_ip_mapping_no_management_ip(self): + """Test when topology switch has no management IP.""" + model_data = { + 'vxlan': { + 'topology': { + 'switches': [ + { + 'name': 'leaf-01', + 'management': {} + } + ] + }, + 'policy': { + 'switches': [ + {'name': 'leaf-01'} + ] + } + } + } + + result = hostname_to_ip_mapping(model_data) + + assert 'mgmt_ip_address' not in result['vxlan']['policy']['switches'][0] + + def test_hostname_to_ip_mapping_empty_switches(self): + """Test with empty switches lists.""" + model_data = { + 'vxlan': { + 'topology': { + 'switches': [] + }, + 'policy': { + 'switches': [] + } + } + } + + result = hostname_to_ip_mapping(model_data) + + assert result['vxlan']['policy']['switches'] == [] + + def test_hostname_to_ip_mapping_preserves_existing_data(self): + """Test that existing data is preserved.""" + model_data = { + 'vxlan': { + 'topology': { + 'switches': [ + { + 'name': 'leaf-01', + 'management': { + 'management_ipv4_address': '192.168.1.10' + } + } + ] + }, + 'policy': { + 'switches': [ + { + 'name': 'leaf-01', + 'existing_field': 'existing_value' + } + ] + } + } + } + + result = hostname_to_ip_mapping(model_data) + + assert result['vxlan']['policy']['switches'][0]['mgmt_ip_address'] == '192.168.1.10' + assert result['vxlan']['policy']['switches'][0]['existing_field'] == 'existing_value' + + +class TestNdfc_GetSwitchPolicy: + """Test the ndfc_get_switch_policy function.""" + + def test_ndfc_get_switch_policy_success(self): + """Test successful switch policy retrieval.""" + mock_self = Mock() + mock_self._execute_module.return_value = { + 'response': { + 'DATA': [ + { + 'serialNumber': 'ABC123', + 'templateName': 'leaf_template', + 'policy': 'test_policy' + } + ] + } + } + + task_vars = {'ansible_host': 'test_host'} + tmp = None + switch_serial = 'ABC123' + + result = ndfc_get_switch_policy(mock_self, task_vars, tmp, switch_serial) + + mock_self._execute_module.assert_called_once_with( + module_name="cisco.dcnm.dcnm_rest", + module_args={ + "method": "GET", + "path": f"/appcenter/cisco/ndfc/api/v1/lan-fabric/rest/control/policies/switches/{switch_serial}/SWITCH/SWITCH" + }, + task_vars=task_vars, + tmp=tmp + ) + + assert result['response']['DATA'][0]['serialNumber'] == 'ABC123' + + def test_ndfc_get_switch_policy_module_call_parameters(self): + """Test that module is called with correct parameters.""" + mock_self = Mock() + mock_self._execute_module.return_value = {'response': {'DATA': []}} + + task_vars = {'test_var': 'test_value'} + tmp = 'test_tmp' + switch_serial = 'XYZ789' + + ndfc_get_switch_policy(mock_self, task_vars, tmp, switch_serial) + + expected_path = f"/appcenter/cisco/ndfc/api/v1/lan-fabric/rest/control/policies/switches/{switch_serial}/SWITCH/SWITCH" + mock_self._execute_module.assert_called_once_with( + module_name="cisco.dcnm.dcnm_rest", + module_args={ + "method": "GET", + "path": expected_path + }, + task_vars=task_vars, + tmp=tmp + ) + + +class TestNdfc_GetSwitchPolicyUsingTemplate: + """Test the ndfc_get_switch_policy_using_template function.""" + + def test_ndfc_get_switch_policy_using_template_success(self): + """Test successful policy retrieval using template.""" + mock_self = Mock() + + # Mock the response from ndfc_get_switch_policy + with patch('helper_functions.ndfc_get_switch_policy') as mock_get_policy: + mock_get_policy.return_value = { + 'response': { + 'DATA': [ + { + 'serialNumber': 'ABC123', + 'templateName': 'leaf_template', + 'policy': 'test_policy' + }, + { + 'serialNumber': 'ABC123', + 'templateName': 'spine_template', + 'policy': 'other_policy' + } + ] + } + } + + task_vars = {'ansible_host': 'test_host'} + tmp = None + switch_serial = 'ABC123' + template_name = 'leaf_template' + + result = ndfc_get_switch_policy_using_template(mock_self, task_vars, tmp, switch_serial, template_name) + + mock_get_policy.assert_called_once_with(mock_self, task_vars, tmp, switch_serial) + assert result['serialNumber'] == 'ABC123' + assert result['templateName'] == 'leaf_template' + assert result['policy'] == 'test_policy' + + def test_ndfc_get_switch_policy_using_template_not_found(self): + """Test when template is not found.""" + mock_self = Mock() + + with patch('helper_functions.ndfc_get_switch_policy') as mock_get_policy: + mock_get_policy.return_value = { + 'response': { + 'DATA': [ + { + 'serialNumber': 'ABC123', + 'templateName': 'spine_template', + 'policy': 'other_policy' + } + ] + } + } + + task_vars = {'ansible_host': 'test_host'} + tmp = None + switch_serial = 'ABC123' + template_name = 'leaf_template' + + with pytest.raises(Exception) as exc_info: + ndfc_get_switch_policy_using_template(mock_self, task_vars, tmp, switch_serial, template_name) + + assert "Policy for template leaf_template and switch ABC123 not found!" in str(exc_info.value) + assert "Please ensure switch with serial number ABC123 is part of the fabric." in str(exc_info.value) + + def test_ndfc_get_switch_policy_using_template_wrong_serial(self): + """Test when switch serial doesn't match.""" + mock_self = Mock() + + with patch('helper_functions.ndfc_get_switch_policy') as mock_get_policy: + mock_get_policy.return_value = { + 'response': { + 'DATA': [ + { + 'serialNumber': 'XYZ789', + 'templateName': 'leaf_template', + 'policy': 'test_policy' + } + ] + } + } + + task_vars = {'ansible_host': 'test_host'} + tmp = None + switch_serial = 'ABC123' + template_name = 'leaf_template' + + with pytest.raises(Exception) as exc_info: + ndfc_get_switch_policy_using_template(mock_self, task_vars, tmp, switch_serial, template_name) + + assert "Policy for template leaf_template and switch ABC123 not found!" in str(exc_info.value) + + +class TestNdfc_GetSwitchPolicyUsingDesc: + """Test the ndfc_get_switch_policy_using_desc function.""" + + def test_ndfc_get_switch_policy_using_desc_success(self): + """Test successful policy retrieval using description.""" + mock_self = Mock() + + with patch('helper_functions.ndfc_get_switch_policy') as mock_get_policy: + mock_get_policy.return_value = { + 'response': { + 'DATA': [ + { + 'serialNumber': 'ABC123', + 'templateName': 'leaf_template', + 'description': 'nac-generated policy', + 'source': '', + 'policy': 'test_policy' + }, + { + 'serialNumber': 'ABC123', + 'templateName': 'spine_template', + 'description': 'manual policy', + 'source': 'manual', + 'policy': 'other_policy' + } + ] + } + } + + task_vars = {'ansible_host': 'test_host'} + tmp = None + switch_serial = 'ABC123' + prefix = 'nac' + + result = ndfc_get_switch_policy_using_desc(mock_self, task_vars, tmp, switch_serial, prefix) + + mock_get_policy.assert_called_once_with(mock_self, task_vars, tmp, switch_serial) + assert len(result) == 1 + assert result[0]['templateName'] == 'leaf_template' + assert result[0]['description'] == 'nac-generated policy' + assert result[0]['source'] == '' + + def test_ndfc_get_switch_policy_using_desc_no_matches(self): + """Test when no policies match the description prefix.""" + mock_self = Mock() + + with patch('helper_functions.ndfc_get_switch_policy') as mock_get_policy: + mock_get_policy.return_value = { + 'response': { + 'DATA': [ + { + 'serialNumber': 'ABC123', + 'templateName': 'leaf_template', + 'description': 'manual policy', + 'source': 'manual', + 'policy': 'test_policy' + } + ] + } + } + + task_vars = {'ansible_host': 'test_host'} + tmp = None + switch_serial = 'ABC123' + prefix = 'nac' + + result = ndfc_get_switch_policy_using_desc(mock_self, task_vars, tmp, switch_serial, prefix) + + assert result == [] + + def test_ndfc_get_switch_policy_using_desc_no_description(self): + """Test when policies have no description.""" + mock_self = Mock() + + with patch('helper_functions.ndfc_get_switch_policy') as mock_get_policy: + mock_get_policy.return_value = { + 'response': { + 'DATA': [ + { + 'serialNumber': 'ABC123', + 'templateName': 'leaf_template', + 'source': '', + 'policy': 'test_policy' + } + ] + } + } + + task_vars = {'ansible_host': 'test_host'} + tmp = None + switch_serial = 'ABC123' + prefix = 'nac' + + result = ndfc_get_switch_policy_using_desc(mock_self, task_vars, tmp, switch_serial, prefix) + + assert result == [] + + def test_ndfc_get_switch_policy_using_desc_non_empty_source(self): + """Test that policies with non-empty source are filtered out.""" + mock_self = Mock() + + with patch('helper_functions.ndfc_get_switch_policy') as mock_get_policy: + mock_get_policy.return_value = { + 'response': { + 'DATA': [ + { + 'serialNumber': 'ABC123', + 'templateName': 'leaf_template', + 'description': 'nac-generated policy', + 'source': 'manual', + 'policy': 'test_policy' + } + ] + } + } + + task_vars = {'ansible_host': 'test_host'} + tmp = None + switch_serial = 'ABC123' + prefix = 'nac' + + result = ndfc_get_switch_policy_using_desc(mock_self, task_vars, tmp, switch_serial, prefix) + + assert result == [] + + +class TestNdfc_GetFabricAttributes: + """Test the ndfc_get_fabric_attributes function.""" + + def test_ndfc_get_fabric_attributes_success(self): + """Test successful fabric attributes retrieval.""" + mock_self = Mock() + mock_self._execute_module.return_value = { + 'response': { + 'DATA': { + 'nvPairs': { + 'FABRIC_NAME': 'test_fabric', + 'BGP_AS': '65000', + 'ANYCAST_GW_MAC': '0000.1111.2222' + } + } + } + } + + task_vars = {'ansible_host': 'test_host'} + tmp = None + fabric = 'test_fabric' + + result = ndfc_get_fabric_attributes(mock_self, task_vars, tmp, fabric) + + mock_self._execute_module.assert_called_once_with( + module_name="cisco.dcnm.dcnm_rest", + module_args={ + "method": "GET", + "path": f"/appcenter/cisco/ndfc/api/v1/lan-fabric/rest/control/fabrics/{fabric}", + }, + task_vars=task_vars, + tmp=tmp + ) + + assert result['FABRIC_NAME'] == 'test_fabric' + assert result['BGP_AS'] == '65000' + assert result['ANYCAST_GW_MAC'] == '0000.1111.2222' + + def test_ndfc_get_fabric_attributes_module_call_parameters(self): + """Test that module is called with correct parameters.""" + mock_self = Mock() + mock_self._execute_module.return_value = { + 'response': { + 'DATA': { + 'nvPairs': {} + } + } + } + + task_vars = {'test_var': 'test_value'} + tmp = 'test_tmp' + fabric = 'production_fabric' + + ndfc_get_fabric_attributes(mock_self, task_vars, tmp, fabric) + + expected_path = f"/appcenter/cisco/ndfc/api/v1/lan-fabric/rest/control/fabrics/{fabric}" + mock_self._execute_module.assert_called_once_with( + module_name="cisco.dcnm.dcnm_rest", + module_args={ + "method": "GET", + "path": expected_path, + }, + task_vars=task_vars, + tmp=tmp + ) + + +class TestNdfc_GetFabricSwitches: + """Test the ndfc_get_fabric_switches function.""" + + def test_ndfc_get_fabric_switches_success(self): + """Test successful fabric switches retrieval.""" + mock_self = Mock() + mock_self._execute_module.return_value = { + 'response': [ + { + 'hostName': 'leaf-01', + 'ipAddress': '192.168.1.10', + 'serialNumber': 'ABC123' + }, + { + 'hostName': 'leaf-02', + 'ipAddress': '192.168.1.11', + 'serialNumber': 'XYZ789' + } + ] + } + + task_vars = {'ansible_host': 'test_host'} + tmp = None + fabric = 'test_fabric' + + result = ndfc_get_fabric_switches(mock_self, task_vars, tmp, fabric) + + mock_self._execute_module.assert_called_once_with( + module_name="cisco.dcnm.dcnm_inventory", + module_args={ + "fabric": fabric, + "state": "query" + }, + task_vars=task_vars, + tmp=tmp + ) + + assert len(result) == 2 + assert result[0]['hostname'] == 'leaf-01' + assert result[0]['mgmt_ip_address'] == '192.168.1.10' + assert result[1]['hostname'] == 'leaf-02' + assert result[1]['mgmt_ip_address'] == '192.168.1.11' + + def test_ndfc_get_fabric_switches_no_hostname(self): + """Test fabric switches without hostname are filtered out.""" + mock_self = Mock() + mock_self._execute_module.return_value = { + 'response': [ + { + 'hostName': 'leaf-01', + 'ipAddress': '192.168.1.10', + 'serialNumber': 'ABC123' + }, + { + 'ipAddress': '192.168.1.11', + 'serialNumber': 'XYZ789' + } + ] + } + + task_vars = {'ansible_host': 'test_host'} + tmp = None + fabric = 'test_fabric' + + result = ndfc_get_fabric_switches(mock_self, task_vars, tmp, fabric) + + assert len(result) == 1 + assert result[0]['hostname'] == 'leaf-01' + assert result[0]['mgmt_ip_address'] == '192.168.1.10' + + def test_ndfc_get_fabric_switches_empty_response(self): + """Test with empty response.""" + mock_self = Mock() + mock_self._execute_module.return_value = { + 'response': [] + } + + task_vars = {'ansible_host': 'test_host'} + tmp = None + fabric = 'test_fabric' + + result = ndfc_get_fabric_switches(mock_self, task_vars, tmp, fabric) + + assert result == [] + + def test_ndfc_get_fabric_switches_module_call_parameters(self): + """Test that module is called with correct parameters.""" + mock_self = Mock() + mock_self._execute_module.return_value = {'response': []} + + task_vars = {'test_var': 'test_value'} + tmp = 'test_tmp' + fabric = 'production_fabric' + + ndfc_get_fabric_switches(mock_self, task_vars, tmp, fabric) + + mock_self._execute_module.assert_called_once_with( + module_name="cisco.dcnm.dcnm_inventory", + module_args={ + "fabric": fabric, + "state": "query" + }, + task_vars=task_vars, + tmp=tmp + ) + + +class TestHelperFunctionsIntegration: + """Integration tests for helper functions.""" + + def test_data_model_key_check_integration(self): + """Test data_model_key_check with realistic data model.""" + realistic_model = { + 'vxlan': { + 'global': { + 'dns_servers': ['8.8.8.8', '8.8.4.4'], + 'ntp_servers': [], + 'spanning_tree': { + 'mode': 'mst' + } + }, + 'topology': { + 'switches': [ + {'name': 'leaf-01', 'role': 'leaf'}, + {'name': 'spine-01', 'role': 'spine'} + ] + } + } + } + + # Test found keys with data + keys = ['vxlan', 'global', 'dns_servers'] + result = data_model_key_check(realistic_model, keys) + assert result['keys_found'] == keys + assert result['keys_data'] == keys + + # Test found keys without data + keys = ['vxlan', 'global', 'ntp_servers'] + result = data_model_key_check(realistic_model, keys) + assert result['keys_found'] == keys + assert result['keys_data'] == ['vxlan', 'global'] + assert result['keys_no_data'] == ['ntp_servers'] + + def test_hostname_to_ip_mapping_integration(self): + """Test hostname_to_ip_mapping with realistic data model.""" + realistic_model = { + 'vxlan': { + 'topology': { + 'switches': [ + { + 'name': 'leaf-01', + 'role': 'leaf', + 'management': { + 'management_ipv4_address': '192.168.1.10' + } + }, + { + 'name': 'spine-01', + 'role': 'spine', + 'management': { + 'management_ipv6_address': '2001:db8::20' + } + } + ] + }, + 'policy': { + 'switches': [ + { + 'name': 'leaf-01', + 'policies': ['leaf_policy'] + }, + { + 'name': 'spine-01', + 'policies': ['spine_policy'] + } + ] + } + } + } + + result = hostname_to_ip_mapping(realistic_model) + + # Check that IPv4 mapping was added + leaf_switch = next(s for s in result['vxlan']['policy']['switches'] if s['name'] == 'leaf-01') + assert leaf_switch['mgmt_ip_address'] == '192.168.1.10' + + # Check that IPv6 mapping was added + spine_switch = next(s for s in result['vxlan']['policy']['switches'] if s['name'] == 'spine-01') + assert spine_switch['mgmt_ip_address'] == '2001:db8::20' + + # Check that existing data was preserved + assert leaf_switch['policies'] == ['leaf_policy'] + assert spine_switch['policies'] == ['spine_policy'] + + +if __name__ == "__main__": + pytest.main([__file__])