diff --git a/ansible_base/authentication/authenticator_plugins/local.py b/ansible_base/authentication/authenticator_plugins/local.py index 36ceca873..fee5af3f6 100644 --- a/ansible_base/authentication/authenticator_plugins/local.py +++ b/ansible_base/authentication/authenticator_plugins/local.py @@ -1,17 +1,16 @@ import logging -from urllib.parse import urljoin -import requests from django.contrib.auth import get_user_model from django.contrib.auth.backends import ModelBackend -from django.core.exceptions import ValidationError from django.utils.translation import gettext_lazy as _ -from requests.auth import HTTPBasicAuth +from rest_framework import serializers +from rest_framework.serializers import ValidationError from ansible_base.authentication.authenticator_plugins.base import AbstractAuthenticatorPlugin, BaseAuthenticatorConfiguration from ansible_base.authentication.utils.authentication import get_or_create_authenticator_user from ansible_base.authentication.utils.claims import update_user_claims -from ansible_base.lib.utils.settings import get_setting +from ansible_base.lib.serializers.fields import ListField +from ansible_base.lib.utils.imports import MODULE_PATH_PATTERN, import_object logger = logging.getLogger('ansible_base.authentication.authenticator_plugins.local') @@ -24,10 +23,42 @@ class LocalConfiguration(BaseAuthenticatorConfiguration): documentation_url = "https://docs.djangoproject.com/en/4.2/ref/contrib/auth/#django.contrib.auth.backends.ModelBackend" - def validate(self, data): - if data != {}: - raise ValidationError(_({"configuration": "Can only be {} for local authenticators"})) - return data + fallback_authentication = ListField( + help_text=_( + 'List of fallback authentication handler modules to attempt when primary authentication fails. ' + 'Each item should be a Python module path containing a FallbackAuthenticator class ' + '(e.g., "my_app.authentication.fallbacks.my_fallback_service"). ' + 'The module must contain a class named "FallbackAuthenticator". ' + 'Fallbacks are attempted in the order specified.' + ), + allow_null=True, + required=False, + default=[], + ui_field_label=_('Fallback Authentication Handlers'), + child=serializers.CharField(), + ) + + def validate(self, attrs): + """ + Validate the configuration and ensure fallback authenticators are valid module paths. + """ + # Call parent validation + attrs = super().validate(attrs) + + # Validate fallback_authentication module paths + fallback_paths = attrs.get('fallback_authentication', []) + if fallback_paths: + errors = {} + for index, path in enumerate(fallback_paths): + if not isinstance(path, str): + errors[index] = _('Must be a string representing a Python module path') + elif not MODULE_PATH_PATTERN.match(path): + errors[index] = _('Invalid module path format. Must be a valid Python module path with at least one dot (e.g., "myapp.fallbacks.handler")') + + if errors: + raise ValidationError({'fallback_authentication': errors}) + + return attrs class AuthenticatorPlugin(ModelBackend, AbstractAuthenticatorPlugin): @@ -50,25 +81,12 @@ def authenticate(self, request, username=None, password=None, **kwargs): logger.info(f"Local authenticator {self.database_instance.name} is disabled, skipping") return None + # Try standard ModelBackend authentication first user = super().authenticate(request, username, password, **kwargs) - controller_login_results = None - if ( - not user - and request - and request.path.startswith('/api/gateway/v1/login/') - and (controller_login_results := self._can_authenticate_from_controller(username, password)) - ): - logger.warning("User has been validated by controller, updating gateway user.") - self.update_gateway_user(username, password) - user = super().authenticate(request, username, password, **kwargs) - elif not user: - logger.info( - "Fallback authentication condition not met: " - f"username={username}, " - f"request={'set' if request else 'None'}, " - f"login_path={'True' if request and request.path.startswith('/api/gateway/v1/login/') else 'False'}, " - f"controller_login_results={controller_login_results}" - ) + + # If authentication failed, try fallback authenticators + if not user: + user = self._try_fallback_authenticators(request, username, password, **kwargs) # This auth class doesn't create any new local users, but we still need to make sure # it has an AuthenticatorUser associated with it. @@ -88,137 +106,63 @@ def authenticate(self, request, username=None, password=None, **kwargs): ) return update_user_claims(user, self.database_instance, []) - def _can_authenticate_from_controller(self, username, password): - """ - Check if a user exists in the AuthenticatorUser table with the local authenticator provider. - If the user is valid, update the gateway users credentials with the controller credentials. + def _try_fallback_authenticators(self, request, username, password, **kwargs): """ - try: - user = UserModel._default_manager.get_by_natural_key(username) - except UserModel.DoesNotExist: - logger.warning(f"User '{username}' does not exist in the database.") - return False - - # Skip controller authentication if user has use_controller_password field set to False - # Default to False when field doesn't exist (test environments) - if not getattr(user, 'use_controller_password', False): - logger.warning(f"User '{username}' password not in Controller.") - return False - - if controller_user := self._get_controller_user(username, password): - # Validate controller_user has a ldap_dn field, if it is not None, then the user is a local user - ldap_dn = controller_user.get("ldap_dn") - if ldap_dn is None or ldap_dn != "": - logger.warning(f"User '{username}' is an ldap user and can not be authenticated.") - return False - if controller_user.get('password', None) != "$encrypted$": - logger.warning(f"User '{username}' is an enterprise user and can not be authenticated.") - return False - return True - else: - return False + Try each configured fallback authenticator in order. - def _get_controller_user(self, username: str, password: str): - """ - Get the user from the controller by making a request to the controller API /me/ endpoint. - If the user is not found, return None. - If the user is found, return the user. - """ + Fallback authenticators are loaded as plugins from the 'fallback_authentication' configuration + field, which should be a list of module paths. Each module must contain a class named + 'FallbackAuthenticator'. - controller_base_domain = get_setting('gateway_proxy_url') - if not controller_base_domain: - logger.warning("Controller authentication failed, unable to get controller base domain") - return None - controller_url = urljoin(controller_base_domain, "/api/controller/v2/me/") - - timeout = get_setting('GRPC_SERVER_AUTH_SERVICE_TIMEOUT') - timeout = self._convert_to_seconds(timeout) - - try: - response = requests.get(controller_url, auth=HTTPBasicAuth(username, password), timeout=int(timeout)) - response.raise_for_status() - user_data = response.json() - - # Check if count exists and equals 1 - count = user_data.get("count") - if count != 1: - logger.warning(f"Unable to authenticate user '{username}' with controller.") - return None - - # Check if results exists and is a non-empty list - results = user_data.get("results") - if not results or not isinstance(results, list) or len(results) == 0: - logger.info(f"Unable to authenticate user '{username}' with controller. Invalid or empty results.") - return None - if not isinstance(results[0], dict): - logger.warning(f"Unable to authenticate user '{username}' with controller. user was not a dictionary.") - return False - - return results[0] - except requests.exceptions.HTTPError as http_err: - logger.warning(f"HTTP error occurred: {http_err}") - return None - except requests.exceptions.ConnectionError as conn_err: - logger.warning(f"Connection error occurred: {conn_err}") - return None - except requests.exceptions.Timeout as timeout_err: - logger.warning(f"Timeout error occurred: {timeout_err}") - return None - except requests.exceptions.RequestException as err: - logger.warning(f"An unexpected error occurred: {err}") - return None - except ValueError as json_err: - logger.warning(f"JSON decode error occurred: {json_err}") - return None - except Exception as err: - logger.warning(f"An unexpected error occurred: {err}") - return None + Example: + Configuration: ['my_service.authentication.fallbacks.my_fallback_service'] + Loads: my_service.authentication.fallbacks.my_fallback_service.FallbackAuthenticator - def update_gateway_user(self, username, password): - """ - Update the gateway user with the controller credentials and set is_partially_migrated to False. - """ - user = UserModel._default_manager.get_by_natural_key(username) - user.set_password(password) + Each fallback authenticator is instantiated and checked to see if it should be attempted + using its should_attempt() method. If so, its authenticate() method is called. - # Set use_controller_password to False if the field exists - update_fields = ['password'] - if hasattr(user, 'use_controller_password'): - user.use_controller_password = False - update_fields.append('use_controller_password') + If a fallback authenticator returns a user, we use that user. Otherwise, we continue + to the next fallback authenticator. - user.save(update_fields=update_fields) - logger.info(f"Updated user {username} gateway account") + Args: + request: The HTTP request object + username: The username to authenticate + password: The password to authenticate + **kwargs: Additional authentication parameters - def _convert_to_seconds(self, s): - """ - Converts a time string like '15s', '5m', '1h', '2d', '3w' to seconds. + Returns: + The authenticated user object if successful, None otherwise """ - default = 10 - try: - unit = s[-1].lower() - value = int(s[:-1]) - - ret_val = 0 - # Check units - if unit == '-': - ret_val = default - elif unit == 's': - ret_val = value - elif unit == 'm': - ret_val = value * 60 - elif unit == 'h': - ret_val = value * 3600 # 60 * 60 - elif unit == 'd': - ret_val = value * 86400 # 60 * 60 * 24 - elif unit == 'w': - ret_val = value * 604800 # 60 * 60 * 24 * 7 - else: - ret_val = int(s) - # If less than or equal to 0, return default - if ret_val <= 0: - ret_val = default - return ret_val - except Exception: - logger.warning(f"Invalid duration format: '{s}'") - return default + configuration = self.database_instance.configuration if self.database_instance else {} + fallback_paths = configuration.get('fallback_authentication', []) + + for module_path in fallback_paths: + try: + # Load the fallback plugin (must contain a class named 'FallbackAuthenticator') + fallback_class = import_object(module_path, 'FallbackAuthenticator') + + # Instantiate the fallback authenticator + fallback_authenticator = fallback_class() + + logger.info(f"Attempting fallback authenticator: {module_path}") + + # Try the fallback authentication + user = fallback_authenticator.authenticate(request, username, password, **kwargs) + + # If fallback returned a user, use it + if user: + logger.info(f"Fallback authenticator {module_path} returned user {user.username}") + return user + + except (ImportError, AttributeError, ValueError) as e: + logger.error(f"Failed to load fallback authenticator plugin from {module_path}: {e}") + continue + except Exception as e: + logger.error(f"Error in fallback authenticator {module_path}: {e}") + continue + + if not fallback_paths: + logger.debug("No fallback authenticators configured") + else: + logger.debug("All fallback authenticators exhausted, authentication failed") + return None diff --git a/ansible_base/authentication/authenticator_plugins/utils.py b/ansible_base/authentication/authenticator_plugins/utils.py index d3db5194c..2e6b8b40a 100644 --- a/ansible_base/authentication/authenticator_plugins/utils.py +++ b/ansible_base/authentication/authenticator_plugins/utils.py @@ -8,6 +8,8 @@ from django.db.models.fields import uuid from django.utils.text import slugify +from ansible_base.lib.utils.imports import import_object + logger = logging.getLogger('ansible_base.authentication.authenticator_plugins.utils') setting = 'ANSIBLE_BASE_AUTHENTICATOR_CLASS_PREFIXES' @@ -33,9 +35,8 @@ def get_authenticator_class(authenticator_type: str): raise ImportError("Must pass authenticator type to import") try: logger.debug(f"Attempting to load class {authenticator_type}") - auth_class = __import__(authenticator_type, globals(), locals(), ['AuthenticatorPlugin'], 0) - return auth_class.AuthenticatorPlugin - except Exception as e: + return import_object(authenticator_type, 'AuthenticatorPlugin') + except (ValueError, ImportError, AttributeError) as e: logger.exception(f"The specified authenticator type {authenticator_type} could not be loaded, see exception below") raise ImportError(f"The specified authenticator type {authenticator_type} could not be loaded") from e diff --git a/ansible_base/lib/utils/duration.py b/ansible_base/lib/utils/duration.py new file mode 100644 index 000000000..698af8637 --- /dev/null +++ b/ansible_base/lib/utils/duration.py @@ -0,0 +1,83 @@ +""" +Utility functions for parsing and converting duration/time strings. +""" + +import logging +import re +from typing import Optional + +logger = logging.getLogger('ansible_base.lib.utils.duration') + + +DURATION_CHAR_TO_SECONDS = { + 's': 1, + 'm': 60, + 'h': 3600, + 'd': 86400, + 'w': 604800, +} + +DURATION_RE = re.compile(r"^(-?\d+)([smhdw]?)$") + + +def convert_to_seconds(duration_string: Optional[str], default: int = 10) -> int: + """ + Converts a duration string like '15s', '5m', '1h', '2d', '3w' to seconds. + + This function parses duration strings and converts them to seconds. It allows + negative values, leaving validation to the caller based on their use case. + + Args: + duration_string: A string representing a duration with a unit suffix. + Supported units: s (seconds), m (minutes), h (hours), + d (days), w (weeks). Can also be a plain integer string + for seconds. Negative values are supported. Case-insensitive. + default: The default value to return if the input is invalid or cannot + be parsed. Must be an integer. Defaults to 10 seconds. If a non-integer + value is provided, a warning with stack trace is logged and 10 is used instead. + + Returns: + int: The duration in seconds (can be negative), or the default value if invalid. + + Examples: + >>> convert_to_seconds('15s') + 15 + >>> convert_to_seconds('5m') + 300 + >>> convert_to_seconds('1h') + 3600 + >>> convert_to_seconds('2d') + 172800 + >>> convert_to_seconds('1w') + 604800 + >>> convert_to_seconds('30') + 30 + >>> convert_to_seconds('-5s') + -5 + >>> convert_to_seconds('-1d') + -86400 + >>> convert_to_seconds('invalid') + 10 + >>> convert_to_seconds('invalid', default=42) + 42 + >>> convert_to_seconds('invalid', default='not_an_int') # Logs warning with stack trace, returns 10 + 10 + """ + # Validate that default is an integer (but not a boolean, which is a subclass of int in Python) + if isinstance(default, bool) or not isinstance(default, int): + logger.warning(f"Invalid default value: '{default}' (type: {type(default).__name__}). Must be an integer. Using default of 10.", stack_info=True) + default = 10 + + try: + if duration_string is None: + raise ValueError("Duration string is None") + + if matches := DURATION_RE.match(duration_string.lower()): + number = int(matches.group(1)) # The numeric part (can be negative) + unit = matches.group(2) or 's' # The unit character, default to 's' + return number * DURATION_CHAR_TO_SECONDS[unit] + else: + raise ValueError("Invalid duration format") + except Exception as e: + logger.warning(f"Invalid duration format: '{duration_string}' ({e}), return default of {default}") + return default diff --git a/ansible_base/lib/utils/imports.py b/ansible_base/lib/utils/imports.py new file mode 100644 index 000000000..1fb4ccf47 --- /dev/null +++ b/ansible_base/lib/utils/imports.py @@ -0,0 +1,78 @@ +""" +Utility functions for dynamically importing Python modules and objects. +""" + +import importlib +import re +from typing import Any, Optional + +# Pattern components for Python identifiers +# Python identifiers must start with a letter or underscore +_IDENTIFIER_START = r'[a-zA-Z_]' +# Pattern string for a single module path segment (identifier) +_MODULE_SEGMENT = rf'{_IDENTIFIER_START}\w*' + +# Pattern for valid Python module paths: +# - Each segment must start with a letter or underscore +# - Followed by letters, digits, or underscores (\w) +# - Must have at least one dot separating segments +# Used for validating module paths before attempting imports +MODULE_PATH_PATTERN = re.compile(rf'^{_MODULE_SEGMENT}(\.{_MODULE_SEGMENT})+$') + +# Pattern for full import paths (module.path.Attribute) +# Captures module path in group(1) and attribute name in group(2) +# Requires at least one dot separator between module and attribute +# Reuses _MODULE_SEGMENT for consistency +FULL_IMPORT_PATTERN = re.compile(rf'^({_MODULE_SEGMENT}(?:\.{_MODULE_SEGMENT})*)\.({_MODULE_SEGMENT})$') + + +def import_object(import_path: str, default_attr: Optional[str] = None) -> Any: + """ + Import a class, function, or object from a module path. + + This function provides a unified way to dynamically import objects from modules, + supporting two different invocation patterns for maximum flexibility. + + Supports two formats: + 1. Full path with attribute: 'module.path.ClassName' + 2. Separate module and attribute: ('module.path', 'ClassName') + + Args: + import_path: Module path, optionally including the attribute name at the end. + When default_attr is provided, this should be the module path only. + default_attr: If provided, treat import_path as module-only and use this as + the attribute name to retrieve from the module. + + Returns: + The imported object (class, function, constant, etc.) + + Raises: + ImportError: If the module cannot be imported + AttributeError: If the attribute doesn't exist in the module + ValueError: If import_path is invalid (e.g., doesn't contain a dot when default_attr is None) + + Examples: + Import a settings object using full path: + >>> import_object('django.conf.settings') + + + Import a class using module path and attribute name: + >>> import_object('my_app.authentication.fallbacks.controller', 'FallbackAuthenticator') + + + Import a function using full path: + >>> import_object('django.utils.text.slugify') + + """ + if default_attr: + import_path = f"{import_path}.{default_attr}" + + matches = FULL_IMPORT_PATTERN.match(import_path) + if not matches: + raise ValueError( + f"Invalid import path: '{import_path}'. " + "Must be a valid Python module path with at least one dot (e.g., 'module.attribute' or 'module.submodule.attribute')." + ) + + module = importlib.import_module(matches.group(1)) + return getattr(module, matches.group(2)) diff --git a/ansible_base/lib/utils/settings.py b/ansible_base/lib/utils/settings.py index b79d8a93e..010427785 100644 --- a/ansible_base/lib/utils/settings.py +++ b/ansible_base/lib/utils/settings.py @@ -1,10 +1,11 @@ -import importlib import logging +import warnings from typing import Any from django.conf import settings from django.utils.translation import gettext_lazy as _ +from ansible_base.lib.utils.imports import import_object from ansible_base.lib.utils.validation import to_python_boolean logger = logging.getLogger('ansible_base.lib.utils.settings') @@ -41,10 +42,8 @@ def get_function_from_setting(setting_name: str) -> Any: return None try: - module_name, _junk, function_name = setting.rpartition('.') - the_function = getattr(importlib.import_module(module_name), function_name) - return the_function - except Exception: + return import_object(setting) + except (ValueError, ImportError, AttributeError): logger.exception(_('{setting_name} was set but we were unable to import its reference as a function.').format(setting_name=setting_name)) return None @@ -68,9 +67,26 @@ def override_setting(*args, **kwargs): def get_from_import(module_name, attr): - "Thin wrapper around importlib.import_module, mostly exists so that we can safely mock this in tests" - module = importlib.import_module(module_name, package=attr) - return getattr(module, attr) + """ + Thin wrapper around import_object. + + .. deprecated:: + This function is deprecated and will be removed in a future version. + Use :func:`ansible_base.lib.utils.imports.import_object` directly instead. + + Args: + module_name: The module path to import from + attr: The attribute name to retrieve from the module + + Returns: + The imported object + """ + warnings.warn( + "get_from_import is deprecated. Use ansible_base.lib.utils.imports.import_object directly.", + DeprecationWarning, + stacklevel=2, + ) + return import_object(module_name, attr) def is_aoc_instance(): diff --git a/ansible_base/lib/utils/views/django_app_api.py b/ansible_base/lib/utils/views/django_app_api.py index 1b8d375e4..5e403e786 100644 --- a/ansible_base/lib/utils/views/django_app_api.py +++ b/ansible_base/lib/utils/views/django_app_api.py @@ -4,7 +4,7 @@ from django.utils.translation import gettext_lazy as _ from rest_framework.settings import api_settings, import_from_string -from ansible_base.lib.utils.settings import get_from_import +from ansible_base.lib.utils.imports import import_object from ansible_base.lib.utils.views.ansible_base import AnsibleBaseView logger = logging.getLogger('ansible_base.lib.utils.views.django_app_api') @@ -21,7 +21,7 @@ if not module_name or not class_name: logger.error(_("ANSIBLE_BASE_CUSTOM_VIEW_PARENT must be in the format package.subpackage.view, defaulting to AnsibleBaseView")) else: - parent_view_class = get_from_import(module_name, class_name) + parent_view_class = import_object(module_name, class_name) except ModuleNotFoundError: logger.error(_("Failed to find parent view class {parent_view}, defaulting to AnsibleBaseView".format(parent_view=parent_view))) except ImportError: diff --git a/test_app/tests/authentication/authenticator_plugins/test_local.py b/test_app/tests/authentication/authenticator_plugins/test_local.py index ea59578e2..a6bd738ae 100644 --- a/test_app/tests/authentication/authenticator_plugins/test_local.py +++ b/test_app/tests/authentication/authenticator_plugins/test_local.py @@ -1,8 +1,6 @@ from unittest import mock import pytest -import requests -from django.contrib.auth import get_user_model from django.test.client import RequestFactory from ansible_base.authentication.authenticator_plugins.local import AuthenticatorPlugin @@ -12,35 +10,6 @@ authenticated_test_page = "authenticator-list" -def mock_get_setting(setting_name): - """Helper function to mock get_setting with appropriate values for different settings.""" - if setting_name == 'gateway_proxy_url': - return 'http://controller.example.com' - elif setting_name == 'GRPC_SERVER_AUTH_SERVICE_TIMEOUT': - return '30s' # Return a valid duration format - else: - return None - - -@pytest.fixture -def controller_auth_mocks(user): - """ - Helper fixture that provides common mocks for controller authentication tests. - Returns a context manager that sets up the basic mocks needed for controller auth. - """ - from contextlib import contextmanager - - @contextmanager - def mock_controller_auth(): - with ( - mock.patch.object(user, 'use_controller_password', True, create=True), - mock.patch('ansible_base.authentication.authenticator_plugins.local.UserModel._default_manager.get_by_natural_key', return_value=user), - ): - yield - - return mock_controller_auth - - @mock.patch("rest_framework.views.APIView.authentication_classes", [SessionAuthentication]) def test_local_auth_successful(unauthenticated_api_client, local_authenticator, user): """ @@ -81,13 +50,18 @@ def test_local_auth_failure(unauthenticated_api_client, local_authenticator, use "configuration, expected_status_code", [ ('{}', 201), + ('{"fallback_authentication": ["aap_gateway_api.authentication.fallbacks.controller"]}', 201), ('{"anything": "here"}', 400), + ('{"fallback_authentication": "not_a_list"}', 400), + ('{"fallback_authentication": [123]}', 400), # Not strings + ('{"fallback_authentication": ["nodots"]}', 400), # Invalid module path format (no dots) + ('{"fallback_authentication": ["has spaces"]}', 400), # Invalid module path with spaces ], ) -def test_local_auth_create_configuration_must_be_empty(admin_api_client, configuration, expected_status_code, shut_up_logging): +def test_local_auth_create_configuration_validates_properly(admin_api_client, configuration, expected_status_code, shut_up_logging): """ - Attempt to create a local authenticator with invalid configuration and test - that it fails. + Attempt to create a local authenticator with various configurations and test + that it validates properly. """ url = get_relative_url("authenticator-list") data = { @@ -103,14 +77,44 @@ def test_local_auth_create_configuration_must_be_empty(admin_api_client, configu def test_local_auth_configuration_validate(): - # Technically if you try to add anything the validator should report this as an invalid field but lets force the issue - from django.core.exceptions import ValidationError + from rest_framework.serializers import ValidationError from ansible_base.authentication.authenticator_plugins.local import LocalConfiguration config = LocalConfiguration() - with pytest.raises(ValidationError): - config.validate({'something', 'here'}) + + # Valid: empty configuration + result = config.validate({}) + assert result == {} + + # Valid: with fallback_authentication + result = config.validate({'fallback_authentication': ['module.path.to.fallback']}) + assert result['fallback_authentication'] == ['module.path.to.fallback'] + + # Valid: multiple fallbacks + result = config.validate({'fallback_authentication': ['module.path.one', 'module.path.two']}) + assert len(result['fallback_authentication']) == 2 + + # Invalid: fallback_authentication is not a list (test via serializer) + with pytest.raises(ValidationError) as exc_info: + # This will fail during serialization before validate() is called + config.to_internal_value({'fallback_authentication': 'not_a_list'}) + assert 'fallback_authentication' in str(exc_info.value) + + # Invalid: fallback_authentication contains non-strings + with pytest.raises(ValidationError) as exc_info: + config.validate({'fallback_authentication': [123]}) + assert 'fallback_authentication' in str(exc_info.value) + + # Invalid: fallback_authentication contains path without dots + with pytest.raises(ValidationError) as exc_info: + config.validate({'fallback_authentication': ['nodots']}) + assert 'fallback_authentication' in str(exc_info.value) + + # Invalid: fallback_authentication contains path with invalid characters + with pytest.raises(ValidationError) as exc_info: + config.validate({'fallback_authentication': ['invalid path!']}) + assert 'fallback_authentication' in str(exc_info.value) def test_local_auth_instance_not_enabled(local_authenticator, expected_log): @@ -130,918 +134,511 @@ def test_local_auth_no_db_instance(): assert plugin.authenticate(request=RequestFactory(), username='jane', password='doe') is None -@pytest.mark.django_db() -def test_can_authenticate_from_controller_nonexistent_user(): - """ - Test that _can_authenticate_from_controller returns False for a non-existent user. - """ - plugin = AuthenticatorPlugin() - result = plugin._can_authenticate_from_controller("nonexistent_user", "password") - assert result is False +# ============================================================================ +# Tests for fallback authentication functionality +# ============================================================================ -@pytest.mark.django_db() -def test_can_authenticate_from_controller_success(user, controller_auth_mocks): - """ - Test that _can_authenticate_from_controller returns True when all conditions are met. - """ - plugin = AuthenticatorPlugin() - - # Set user password to encrypted (indicating partial migration) - user.password = "$encrypted$" - user.save() +class MockFallbackAuthenticator: + """Mock fallback authenticator for testing.""" - # Use fixture for common mocks and add specific controller user response - with controller_auth_mocks(), mock.patch.object(plugin, '_get_controller_user', return_value={"ldap_dn": "", "password": "$encrypted$"}): - result = plugin._can_authenticate_from_controller(user.username, "password") - assert result is True + def __init__(self, database_instance=None, configuration=None): + self.database_instance = database_instance + self.configuration = configuration or {} + self.authenticate_called = False + def authenticate(self, request, username, password, **kwargs): + self.authenticate_called = True + return None # Return None by default, override in tests -@pytest.mark.django_db() -def test_can_authenticate_from_controller_no_controller_user(user): - """ - Test that _can_authenticate_from_controller returns False when controller user not found. - """ - plugin = AuthenticatorPlugin() - # Mock the controller user response to return None - with mock.patch.object(plugin, '_get_controller_user', return_value=None): - result = plugin._can_authenticate_from_controller(user.username, "password") - assert result is False +class MockSuccessfulFallback(MockFallbackAuthenticator): + """Mock fallback that returns a user.""" + def authenticate(self, request, username, password, **kwargs): + self.authenticate_called = True + # Return a mock user + from test_app.models import User -@pytest.mark.django_db() -def test_can_authenticate_from_controller_invalid_format(user): - """ - Test that _can_authenticate_from_controller returns False when controller user format is invalid. - """ - plugin = AuthenticatorPlugin() + return User.objects.get(username=username) - # Mock the controller user response to return invalid format (False) - with mock.patch.object(plugin, '_get_controller_user', return_value=False): - result = plugin._can_authenticate_from_controller(user.username, "password") - assert result is False +class MockFailingFallback(MockFallbackAuthenticator): + """Mock fallback that always fails.""" -@pytest.mark.django_db() -def test_can_authenticate_from_controller_missing_ldap_dn_with_encrypted_password(user): - """ - Test that _can_authenticate_from_controller returns False when ldap_dn is missing and password is encrypted. - """ - plugin = AuthenticatorPlugin() - - # Set user password to encrypted (indicating partial migration) - user.password = "$encrypted$" - user.save() - - # Mock the controller user response without ldap_dn - with mock.patch.object(plugin, '_get_controller_user', return_value={"username": "testuser"}): - result = plugin._can_authenticate_from_controller(user.username, "password") - assert result is False - - -@pytest.mark.django_db() -def test_can_authenticate_from_controller_non_local_user_with_encrypted_password(user): - """ - Test that _can_authenticate_from_controller returns False when user is not local and password is encrypted. - """ - plugin = AuthenticatorPlugin() - - # Set user password to encrypted (indicating partial migration) - user.password = "$encrypted$" - user.save() - - # Mock the controller user response with non-empty ldap_dn - with mock.patch.object(plugin, '_get_controller_user', return_value={"ldap_dn": "cn=testuser,ou=users,dc=example,dc=com"}): - result = plugin._can_authenticate_from_controller(user.username, "password") - assert result is False - - -@pytest.mark.django_db() -def test_can_authenticate_from_controller_non_local_user_with_regular_password(user): - """ - Test that _can_authenticate_from_controller returns False when user is not local (has ldap_dn). - """ - plugin = AuthenticatorPlugin() - - # Set user password to regular password (not encrypted) - user.set_password("regular_password") - user.save() + def authenticate(self, request, username, password, **kwargs): + self.authenticate_called = True + return None - # Mock the controller user response with non-empty ldap_dn (LDAP user) - with mock.patch.object(plugin, '_get_controller_user', return_value={"ldap_dn": "cn=testuser,ou=users,dc=example,dc=com", "password": "regular_password"}): - result = plugin._can_authenticate_from_controller(user.username, "password") - assert result is False +class MockExceptionFallback(MockFallbackAuthenticator): + """Mock fallback that raises an exception.""" -@pytest.mark.django_db() -def test_get_controller_user_success(user): - """ - Test that _get_controller_user returns user data when successful. - """ - plugin = AuthenticatorPlugin() + def authenticate(self, request, username, password, **kwargs): + raise Exception("Fallback error") - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting): - with mock.patch('requests.get') as mock_get: - mock_response = mock.Mock() - mock_response.raise_for_status.return_value = None - mock_response.json.return_value = {"count": 1, "results": [{"ldap_dn": ""}]} - mock_get.return_value = mock_response - result = plugin._get_controller_user(user.username, "password") - assert result == {"ldap_dn": ""} +# ============================================================================ +# Tests for _load_fallback_plugin() +# ============================================================================ -@pytest.mark.django_db() -def test_get_controller_user_no_gateway_proxy_url(user): - """ - Test that _get_controller_user returns None when gateway_proxy_url is not set. - """ - plugin = AuthenticatorPlugin() +class TestLoadFallbackPlugin: + """Tests for the plugin loading mechanism via import_object.""" - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', return_value=None): - result = plugin._get_controller_user(user.username, "password") - assert result is None + def test_load_valid_plugin(self, local_authenticator): + """Test loading a valid fallback plugin via import_object.""" + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + # Mock import_object to return our mock class + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', return_value=MockFallbackAuthenticator): + # This tests that import_object is called correctly in _try_fallback_authenticators + local_authenticator.configuration = {'fallback_authentication': ['test.module.path']} + local_authenticator.save() -@pytest.mark.django_db() -def test_get_controller_user_http_error(user): - """ - Test that _get_controller_user returns None when HTTP error occurs. - """ - plugin = AuthenticatorPlugin() - - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting): - with mock.patch('requests.get') as mock_get: - mock_get.side_effect = requests.exceptions.HTTPError("HTTP Error") - - result = plugin._get_controller_user(user.username, "password") + # We can't directly test the removed method, but we can test the integration + result = plugin._try_fallback_authenticators(None, 'testuser', 'password') + # Should attempt to use the fallback (which will return None since MockFallbackAuthenticator returns None) assert result is None + def test_load_plugin_import_error(self, local_authenticator): + """Test handling of ImportError when loading plugin.""" + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + local_authenticator.configuration = {'fallback_authentication': ['nonexistent.module']} + local_authenticator.save() -@pytest.mark.django_db() -def test_get_controller_user_invalid_count(user): - """ - Test that _get_controller_user returns None when count is invalid. - """ - plugin = AuthenticatorPlugin() - - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting): - with mock.patch('requests.get') as mock_get: - mock_response = mock.Mock() - mock_response.raise_for_status.return_value = None - mock_response.json.return_value = {"count": 0} # Invalid count - mock_get.return_value = mock_response - - result = plugin._get_controller_user(user.username, "password") + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', side_effect=ImportError("Module not found")): + # Should log error but not raise, continuing to try other fallbacks + result = plugin._try_fallback_authenticators(None, 'testuser', 'password') assert result is None + def test_load_plugin_attribute_error(self, local_authenticator): + """Test handling when module doesn't have FallbackAuthenticator class.""" + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + local_authenticator.configuration = {'fallback_authentication': ['test.module.path']} + local_authenticator.save() -@pytest.mark.django_db() -def test_get_controller_user_empty_results(user): - """ - Test that _get_controller_user returns None when results are empty. - """ - plugin = AuthenticatorPlugin() - - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting): - with mock.patch('requests.get') as mock_get: - mock_response = mock.Mock() - mock_response.raise_for_status.return_value = None - mock_response.json.return_value = {"count": 1, "results": []} - mock_get.return_value = mock_response - - result = plugin._get_controller_user(user.username, "password") + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', side_effect=AttributeError("FallbackAuthenticator not found")): + # Should log error but not raise, continuing to try other fallbacks + result = plugin._try_fallback_authenticators(None, 'testuser', 'password') assert result is None -def test_authenticate_with_controller_validation_success(user, local_authenticator): - """ - Test that authentication works when controller validation succeeds. - """ - from ansible_base.authentication.models import AuthenticatorUser - - # Create an AuthenticatorUser entry for the user with local authenticator - AuthenticatorUser.objects.create(uid=user.username, user=user, provider=local_authenticator) - - plugin = AuthenticatorPlugin(database_instance=local_authenticator) - - # Mock controller authentication to succeed - with mock.patch.object(plugin, '_can_authenticate_from_controller', return_value=True): - # Mock the super().authenticate to return None first (simulating initial auth failure) - # then return the user on the second call (after password update) - with mock.patch('django.contrib.auth.backends.ModelBackend.authenticate') as mock_auth: - mock_auth.side_effect = [None, user] # First call returns None, second returns user - - with mock.patch.object(plugin, 'update_gateway_user') as mock_update: - request = RequestFactory().get('/api/gateway/v1/login/') - result = plugin.authenticate(request=request, username=user.username, password="password") - - assert result is not None - assert result == user - mock_update.assert_called_once_with(user.username, "password") - - -def test_authenticate_non_gateway_path_skips_validation(user, local_authenticator): - """ - Test that controller validation is skipped when request path doesn't start with /api/gateway/v1/login/. - """ - from ansible_base.authentication.models import AuthenticatorUser - - # Create an AuthenticatorUser entry for the user with local authenticator - AuthenticatorUser.objects.create(uid=user.username, user=user, provider=local_authenticator) - - plugin = AuthenticatorPlugin(database_instance=local_authenticator) - - # Create a request with different path - request = RequestFactory().get('/some/other/path/') +# ============================================================================ +# Tests for _try_fallback_authenticators() +# ============================================================================ - with mock.patch.object(plugin, '_can_authenticate_from_controller', return_value=False) as mock_check: - with mock.patch('django.contrib.auth.backends.ModelBackend.authenticate', return_value=None): - with mock.patch.object(plugin, 'update_gateway_user') as mock_update: - result = plugin.authenticate(request=request, username=user.username, password="password") - - # _can_authenticate_from_controller is not called because path doesn't match - mock_check.assert_not_called() - # But update_gateway_user should not be called because path doesn't match - mock_update.assert_not_called() - assert result is None - - -def test_update_gateway_user(user): - """ - Test that update_gateway_user correctly updates the user's password. - """ - plugin = AuthenticatorPlugin() - original_password = user.password - - plugin.update_gateway_user(user.username, "new_password") - - # Refresh user from database - user.refresh_from_db() - assert user.password != original_password - assert user.check_password("new_password") - - -def test_authenticate_logs_warning_after_controller_validation(user, local_authenticator, expected_log): - """ - Test that authenticate logs a warning after successful controller validation. - """ - from ansible_base.authentication.models import AuthenticatorUser - # Create an AuthenticatorUser entry for the user with local authenticator - AuthenticatorUser.objects.create(uid=user.username, user=user, provider=local_authenticator) +class TestTryFallbackAuthenticators: + """Tests for the fallback orchestration logic.""" - plugin = AuthenticatorPlugin(database_instance=local_authenticator) - - # Mock controller authentication to succeed - with mock.patch.object(plugin, '_can_authenticate_from_controller', return_value=True): - # Mock the super().authenticate to return None first, then return the user - with mock.patch('django.contrib.auth.backends.ModelBackend.authenticate') as mock_auth: - mock_auth.side_effect = [None, user] # First call returns None, second returns user - - with mock.patch.object(plugin, 'update_gateway_user') as mock_update: - with expected_log('ansible_base.authentication.authenticator_plugins.local.logger', "warning", "User has been validated by controller"): - # Create request with gateway login path - request = RequestFactory().get('/api/gateway/v1/login/') - result = plugin.authenticate(request=request, username=user.username, password="password") - - assert result is not None - assert result == user - mock_update.assert_called_once_with(user.username, "password") - - -# Logging tests for _can_authenticate_from_controller -@pytest.mark.django_db() -def test_can_authenticate_from_controller_logs_warning_for_nonexistent_user(expected_log): - """ - Test that _can_authenticate_from_controller logs a warning for non-existent users. - """ - plugin = AuthenticatorPlugin() - - with expected_log('ansible_base.authentication.authenticator_plugins.local.logger', "warning", "does not exist in the database"): - result = plugin._can_authenticate_from_controller("nonexistent_user", "password") - assert result is False - - -@pytest.mark.django_db() -def test_can_authenticate_from_controller_logs_warning_invalid_format(user, expected_log): - """ - Test that _can_authenticate_from_controller logs a warning for invalid controller user format. - """ - plugin = AuthenticatorPlugin() + @pytest.fixture + def mock_request(self): + """Create a mock request.""" + return RequestFactory().post('/login') - # Mock use_controller_password to True to enable controller authentication - mock_response = mock.Mock() - mock_response.raise_for_status.return_value = None - mock_response.json.return_value = {"count": 1, "results": ["invalid_string_not_dict"]} + def test_no_fallbacks_configured(self, local_authenticator, mock_request): + """Test when no fallback authenticators are configured.""" + local_authenticator.configuration = {} + local_authenticator.save() - with ( - mock.patch.object(user, 'use_controller_password', True, create=True), - mock.patch('ansible_base.authentication.authenticator_plugins.local.UserModel._default_manager.get_by_natural_key', return_value=user), - mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting), - mock.patch('requests.get', return_value=mock_response), - expected_log('ansible_base.authentication.authenticator_plugins.local.logger', "warning", "user was not a dictionary"), - ): - result = plugin._can_authenticate_from_controller(user.username, "password") - assert result is False - - -@pytest.mark.django_db() -def test_can_authenticate_from_controller_logs_warning_not_local_user(user, expected_log): - """ - Test that _can_authenticate_from_controller logs a warning when user cannot be confirmed as local. - """ - plugin = AuthenticatorPlugin() - - # Set user password to encrypted (indicating partial migration) - user.password = "$encrypted$" - user.save() - - # Mock use_controller_password to True to enable controller authentication - with ( - mock.patch.object(user, 'use_controller_password', True, create=True), - mock.patch('ansible_base.authentication.authenticator_plugins.local.UserModel._default_manager.get_by_natural_key', return_value=user), - mock.patch.object(plugin, '_get_controller_user', return_value={"ldap_dn": "cn=user,dc=example,dc=com", "password": "$encrypted$"}), - expected_log('ansible_base.authentication.authenticator_plugins.local.logger', "warning", "is an ldap user and can not be authenticated"), - ): - result = plugin._can_authenticate_from_controller(user.username, "password") - assert result is False + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + result = plugin._try_fallback_authenticators(mock_request, 'testuser', 'password') + assert result is None -@pytest.mark.django_db() -def test_authenticate_logs_fallback_condition_not_met(user, local_authenticator, expected_log): - """ - Test that authenticate logs when fallback authentication condition is not met. - """ - from ansible_base.authentication.models import AuthenticatorUser + def test_empty_fallbacks_list(self, local_authenticator, mock_request): + """Test when fallback_authentication is an empty list.""" + local_authenticator.configuration = {'fallback_authentication': []} + local_authenticator.save() - # Create an AuthenticatorUser entry for the user with local authenticator - AuthenticatorUser.objects.create(uid=user.username, user=user, provider=local_authenticator) + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + result = plugin._try_fallback_authenticators(mock_request, 'testuser', 'password') - plugin = AuthenticatorPlugin(database_instance=local_authenticator) + assert result is None - # Mock regular authentication to fail - with mock.patch('django.contrib.auth.backends.ModelBackend.authenticate', return_value=None): - # Mock _can_authenticate_from_controller to return False so condition fails - with mock.patch.object(plugin, '_can_authenticate_from_controller', return_value=False) as mock_check: - with expected_log('ansible_base.authentication.authenticator_plugins.local.logger', "info", "Fallback authentication condition not met"): - # Create request with gateway login path but controller auth will fail - request = RequestFactory().get('/api/gateway/v1/login/') - result = plugin.authenticate(request=request, username=user.username, password="password") + def test_single_successful_fallback(self, local_authenticator, mock_request, user): + """Test successful authentication with a single fallback.""" + local_authenticator.configuration = {'fallback_authentication': ['test.fallback.mock']} + local_authenticator.save() - # Verify the method was called and returned None - mock_check.assert_called_once_with(user.username, "password") - assert result is None + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', return_value=MockSuccessfulFallback): + result = plugin._try_fallback_authenticators(mock_request, user.username, 'password') -@pytest.mark.django_db() -def test_get_controller_user_logs_warning_for_invalid_count(user, expected_log): - """ - Test that _get_controller_user logs a warning for invalid count. - """ - plugin = AuthenticatorPlugin() + assert result == user - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting): - with mock.patch('requests.get') as mock_get: - mock_response = mock.Mock() - mock_response.raise_for_status.return_value = None - mock_response.json.return_value = {"count": 0} - mock_get.return_value = mock_response + def test_single_failing_fallback(self, local_authenticator, mock_request): + """Test when single fallback fails to authenticate.""" + local_authenticator.configuration = {'fallback_authentication': ['test.fallback.mock']} + local_authenticator.save() - with expected_log('ansible_base.authentication.authenticator_plugins.local.logger', "warning", "Unable to authenticate user"): - result = plugin._get_controller_user(user.username, "password") - assert result is None + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', return_value=MockFailingFallback): + result = plugin._try_fallback_authenticators(mock_request, 'testuser', 'password') -@pytest.mark.django_db() -def test_get_controller_user_logs_warning_for_empty_results(user, expected_log): - """ - Test that _get_controller_user logs a warning for empty results. - """ - plugin = AuthenticatorPlugin() + assert result is None - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting): - with mock.patch('requests.get') as mock_get: - mock_response = mock.Mock() - mock_response.raise_for_status.return_value = None - mock_response.json.return_value = {"count": 1, "results": []} - mock_get.return_value = mock_response + def test_multiple_fallbacks_first_succeeds(self, local_authenticator, mock_request, user): + """Test multiple fallbacks where the first one succeeds.""" + local_authenticator.configuration = {'fallback_authentication': ['test.fallback.success', 'test.fallback.never_called']} + local_authenticator.save() - with expected_log('ansible_base.authentication.authenticator_plugins.local.logger', "info", "Invalid or empty results"): - result = plugin._get_controller_user(user.username, "password") - assert result is None + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + first_fallback = MockSuccessfulFallback() + second_fallback = MockFallbackAuthenticator() + def mock_load(path, attr): + if 'success' in path: + return lambda *args, **kwargs: first_fallback + return lambda *args, **kwargs: second_fallback -# Comprehensive update_gateway_user tests -@pytest.mark.django_db() -def test_update_gateway_user_nonexistent_user(): - """ - Test that update_gateway_user raises exception for non-existent user. - """ - plugin = AuthenticatorPlugin() - UserModel = get_user_model() + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', side_effect=mock_load): + result = plugin._try_fallback_authenticators(mock_request, user.username, 'password') - with pytest.raises(UserModel.DoesNotExist): - plugin.update_gateway_user("nonexistent_user", "password") + assert result == user + assert first_fallback.authenticate_called + assert not second_fallback.authenticate_called # Should not be tried + def test_multiple_fallbacks_second_succeeds(self, local_authenticator, mock_request, user): + """Test multiple fallbacks where the second one succeeds.""" + local_authenticator.configuration = {'fallback_authentication': ['test.fallback.fails', 'test.fallback.success']} + local_authenticator.save() -@pytest.mark.django_db() -def test_update_gateway_user_logs_success(user, expected_log): - """ - Test that update_gateway_user logs success message. - """ - plugin = AuthenticatorPlugin() + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + first_fallback = MockFailingFallback() + second_fallback = MockSuccessfulFallback() - with expected_log('ansible_base.authentication.authenticator_plugins.local.logger', "info", f"Updated user {user.username} gateway account"): - plugin.update_gateway_user(user.username, "new_password") + def mock_load(path, attr): + if 'fails' in path: + return lambda *args, **kwargs: first_fallback + return lambda *args, **kwargs: second_fallback - # Verify changes - user.refresh_from_db() - assert user.check_password("new_password") + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', side_effect=mock_load): + result = plugin._try_fallback_authenticators(mock_request, user.username, 'password') + assert result == user + assert first_fallback.authenticate_called + assert second_fallback.authenticate_called -@pytest.mark.django_db() -def test_update_gateway_user_updates_resource_flag(user): - """ - Test that update_gateway_user properly updates the user password. - """ - plugin = AuthenticatorPlugin() + def test_all_fallbacks_fail(self, local_authenticator, mock_request): + """Test when all configured fallbacks fail.""" + local_authenticator.configuration = {'fallback_authentication': ['test.fallback.fails1', 'test.fallback.fails2', 'test.fallback.fails3']} + local_authenticator.save() - plugin.update_gateway_user(user.username, "new_password") + plugin = AuthenticatorPlugin(database_instance=local_authenticator) - # Verify password was updated - user.refresh_from_db() - assert user.check_password("new_password") + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', return_value=MockFailingFallback): + result = plugin._try_fallback_authenticators(mock_request, 'testuser', 'password') + assert result is None -# Authentication flow edge cases -@pytest.mark.django_db() -def test_authenticate_regular_auth_success_skips_controller_validation(user, local_authenticator): - """ - Test that authentication skips controller validation when regular authentication succeeds. - """ - from ansible_base.authentication.models import AuthenticatorUser + def test_fallback_import_error_continues(self, local_authenticator, mock_request, user, expected_log): + """Test that ImportError in one fallback doesn't stop others.""" + local_authenticator.configuration = {'fallback_authentication': ['test.fallback.bad_import', 'test.fallback.success']} + local_authenticator.save() - # Create an AuthenticatorUser entry for the user with local authenticator - AuthenticatorUser.objects.create(uid=user.username, user=user, provider=local_authenticator) + plugin = AuthenticatorPlugin(database_instance=local_authenticator) - plugin = AuthenticatorPlugin(database_instance=local_authenticator) + def mock_load(path, attr): + if 'bad_import' in path: + raise ImportError("Module not found") + return MockSuccessfulFallback - with mock.patch.object(plugin, '_can_authenticate_from_controller', return_value=False) as mock_check: - with mock.patch('django.contrib.auth.backends.ModelBackend.authenticate', return_value=user): - with mock.patch.object(plugin, 'update_gateway_user') as mock_update: - # Create request with gateway login path - request = RequestFactory().get('/api/gateway/v1/login/') - result = plugin.authenticate(request=request, username=user.username, password="password") + with expected_log('ansible_base.authentication.authenticator_plugins.local.logger', 'error', 'Failed to load fallback authenticator plugin'): + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', side_effect=mock_load): + result = plugin._try_fallback_authenticators(mock_request, user.username, 'password') - # _can_authenticate_from_controller should not be called since regular auth succeeded - mock_check.assert_not_called() - # update_gateway_user should not be called since regular auth succeeded - mock_update.assert_not_called() - assert result is not None + assert result == user + def test_fallback_attribute_error_continues(self, local_authenticator, mock_request, user, expected_log): + """Test that AttributeError in one fallback doesn't stop others.""" + local_authenticator.configuration = {'fallback_authentication': ['test.fallback.bad_class', 'test.fallback.success']} + local_authenticator.save() -# Test missing parameters and edge cases in _get_controller_user -@pytest.mark.django_db() -def test_get_controller_user_missing_count_field(user): - """ - Test that _get_controller_user handles missing count field. - """ - plugin = AuthenticatorPlugin() + plugin = AuthenticatorPlugin(database_instance=local_authenticator) - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting): - with mock.patch('requests.get') as mock_get: - mock_response = mock.Mock() - mock_response.raise_for_status.return_value = None - mock_response.json.return_value = {"results": [{"ldap_dn": ""}]} # Missing count - mock_get.return_value = mock_response + def mock_load(path, attr): + if 'bad_class' in path: + raise AttributeError("FallbackAuthenticator not found") + return MockSuccessfulFallback - result = plugin._get_controller_user(user.username, "password") - assert result is None + with expected_log('ansible_base.authentication.authenticator_plugins.local.logger', 'error', 'Failed to load fallback authenticator plugin'): + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', side_effect=mock_load): + result = plugin._try_fallback_authenticators(mock_request, user.username, 'password') + assert result == user -@pytest.mark.django_db() -def test_get_controller_user_non_list_results(user): - """ - Test that _get_controller_user handles non-list results field. - """ - plugin = AuthenticatorPlugin() + def test_fallback_runtime_exception_continues(self, local_authenticator, mock_request, user, expected_log): + """Test that runtime exception in one fallback doesn't stop others.""" + local_authenticator.configuration = {'fallback_authentication': ['test.fallback.exception', 'test.fallback.success']} + local_authenticator.save() - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting): - with mock.patch('requests.get') as mock_get: - mock_response = mock.Mock() - mock_response.raise_for_status.return_value = None - mock_response.json.return_value = {"count": 1, "results": "not_a_list"} - mock_get.return_value = mock_response + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + exception_fallback = MockExceptionFallback() + success_fallback = MockSuccessfulFallback() - result = plugin._get_controller_user(user.username, "password") - assert result is None + def mock_load(path, attr): + if 'exception' in path: + return lambda *args, **kwargs: exception_fallback + return lambda *args, **kwargs: success_fallback + with expected_log('ansible_base.authentication.authenticator_plugins.local.logger', 'error', 'Error in fallback authenticator'): + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', side_effect=mock_load): + result = plugin._try_fallback_authenticators(mock_request, user.username, 'password') -# Test successful authentication flow with all components -@pytest.mark.django_db() -def test_authenticate_successful_controller_validation_full_flow(user, local_authenticator): - """ - Test complete successful authentication flow with controller validation. - """ - from ansible_base.authentication.models import AuthenticatorUser - - # Create an AuthenticatorUser entry for the user with local authenticator - AuthenticatorUser.objects.create(uid=user.username, user=user, provider=local_authenticator) - - plugin = AuthenticatorPlugin(database_instance=local_authenticator) - - # Set user password to encrypted (indicating partial migration) - user.password = "$encrypted$" - user.save() - - # Mock use_controller_password to True to enable controller authentication - with mock.patch.object(user, 'use_controller_password', True, create=True): - # Mock the database lookup to return our mocked user - with mock.patch('ansible_base.authentication.authenticator_plugins.local.UserModel._default_manager.get_by_natural_key', return_value=user): - # Mock all the components for a successful flow - with ( - mock.patch.object(plugin, '_get_controller_user', return_value={"ldap_dn": "", "password": "$encrypted$"}), - mock.patch('django.contrib.auth.backends.ModelBackend.authenticate') as mock_auth, - mock.patch.object(plugin, 'update_gateway_user') as mock_update, - ): - mock_auth.side_effect = [None, user] # First call fails, second succeeds after password update - - # Create request with gateway login path - request = RequestFactory().get('/api/gateway/v1/login/') - result = plugin.authenticate(request=request, username=user.username, password="password") - - # Verify successful authentication - assert result is not None assert result == user - # Verify update_gateway_user was called - mock_update.assert_called_once_with(user.username, "password") + def test_fallback_instantiation(self, local_authenticator, mock_request): + """Test that fallback authenticators are instantiated correctly.""" + test_config = {'fallback_authentication': ['test.fallback.mock']} + local_authenticator.configuration = test_config + local_authenticator.save() + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + instantiated = [] -# Test connection and timeout errors -@pytest.mark.django_db() -def test_get_controller_user_connection_error(user): - """ - Test that _get_controller_user handles connection errors gracefully. - """ - plugin = AuthenticatorPlugin() + class InstantiationCapturingFallback: + def __init__(self): + instantiated.append(True) - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting): - with mock.patch('requests.get') as mock_get: - mock_get.side_effect = requests.exceptions.ConnectionError("Connection failed") + def authenticate(self, request, username, password, **kwargs): + return None - result = plugin._get_controller_user(user.username, "password") - assert result is None + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', return_value=InstantiationCapturingFallback): + plugin._try_fallback_authenticators(mock_request, 'testuser', 'password') + # Verify the fallback was instantiated + assert len(instantiated) == 1 -@pytest.mark.django_db() -def test_get_controller_user_timeout_error(user): - """ - Test that _get_controller_user handles timeout errors gracefully. - """ - plugin = AuthenticatorPlugin() - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting): - with mock.patch('requests.get') as mock_get: - mock_get.side_effect = requests.exceptions.Timeout("Request timed out") +# ============================================================================ +# Tests for authenticate() integration +# ============================================================================ - result = plugin._get_controller_user(user.username, "password") - assert result is None +class TestAuthenticateIntegration: + """Tests for the main authenticate() method with fallback support.""" -@pytest.mark.django_db() -def test_get_controller_user_general_request_exception(user): - """ - Test that _get_controller_user handles general request exceptions gracefully. - """ - plugin = AuthenticatorPlugin() + @pytest.fixture + def mock_request(self): + """Create a mock request.""" + return RequestFactory().post('/login') - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting): - with mock.patch('requests.get') as mock_get: - mock_get.side_effect = requests.exceptions.RequestException("General request error") + def test_successful_primary_auth_skips_fallback(self, local_authenticator, user, mock_request): + """Test that successful primary authentication doesn't try fallbacks.""" + local_authenticator.configuration = {'fallback_authentication': ['test.fallback.mock']} + local_authenticator.save() - result = plugin._get_controller_user(user.username, "password") - assert result is None + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + fallback = MockFallbackAuthenticator() + # Mock super().authenticate() to succeed + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', return_value=lambda *a, **k: fallback): + with mock.patch('django.contrib.auth.backends.ModelBackend.authenticate', return_value=user): + result = plugin.authenticate(mock_request, user.username, 'password') -# Test JSON parsing error handling -@pytest.mark.django_db() -def test_get_controller_user_json_decode_error(user): - """ - Test that _get_controller_user handles JSON decode errors gracefully. - """ - plugin = AuthenticatorPlugin() + assert result == user + assert not fallback.authenticate_called # Fallback should not be tried - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting): - with mock.patch('requests.get') as mock_get: - mock_response = mock.Mock() - mock_response.raise_for_status.return_value = None - mock_response.json.side_effect = ValueError("Invalid JSON") - mock_get.return_value = mock_response + def test_failed_primary_auth_tries_fallback(self, local_authenticator, user, mock_request): + """Test that failed primary authentication tries fallbacks.""" + local_authenticator.configuration = {'fallback_authentication': ['test.fallback.success']} + local_authenticator.save() - result = plugin._get_controller_user(user.username, "password") - assert result is None + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + # Mock super().authenticate() to fail + with mock.patch('django.contrib.auth.backends.ModelBackend.authenticate', return_value=None): + with mock.patch.object(plugin, '_try_fallback_authenticators', return_value=user) as mock_fallback: + result = plugin.authenticate(mock_request, user.username, 'password') -# Test authentication without request object -@pytest.mark.django_db() -def test_authenticate_without_request_object(user, local_authenticator): - """ - Test authentication behavior when no request object is provided. - """ - from ansible_base.authentication.models import AuthenticatorUser + assert result == user + mock_fallback.assert_called_once_with(mock_request, user.username, 'password') - # Create an AuthenticatorUser entry for the user with local authenticator - AuthenticatorUser.objects.create(uid=user.username, user=user, provider=local_authenticator) + def test_failed_primary_and_fallback_returns_none(self, local_authenticator, mock_request): + """Test that None is returned when both primary and fallback fail.""" + local_authenticator.configuration = {'fallback_authentication': ['test.fallback.fails']} + local_authenticator.save() - plugin = AuthenticatorPlugin(database_instance=local_authenticator) + plugin = AuthenticatorPlugin(database_instance=local_authenticator) - with mock.patch.object(plugin, '_can_authenticate_from_controller', return_value=False) as mock_check: + # Mock both to fail with mock.patch('django.contrib.auth.backends.ModelBackend.authenticate', return_value=None): - with mock.patch.object(plugin, 'update_gateway_user') as mock_update: - result = plugin.authenticate(request=None, username=user.username, password="password") + with mock.patch.object(plugin, '_try_fallback_authenticators', return_value=None): + result = plugin.authenticate(mock_request, 'testuser', 'wrongpassword') - # _can_authenticate_from_controller should not be called without request - mock_check.assert_not_called() - # But update_gateway_user should not be called without request - mock_update.assert_not_called() assert result is None + @pytest.mark.parametrize( + 'username,password', + [ + (None, 'password'), + ('username', None), + (None, None), + ('', 'password'), + ('username', ''), + ], + ) + def test_missing_credentials_returns_none(self, local_authenticator, mock_request, username, password): + """Test that missing credentials return None immediately.""" + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + + result = plugin.authenticate(mock_request, username, password) -# Tests for _convert_to_seconds method -@pytest.mark.django_db() -def test_convert_to_seconds(): - """Test _convert_to_seconds with seconds unit.""" - plugin = AuthenticatorPlugin() - # Test Seconds - assert plugin._convert_to_seconds('15s') == 15 - assert plugin._convert_to_seconds('1s') == 1 - assert plugin._convert_to_seconds('0s') == 10 # Default value - # Test minutes - assert plugin._convert_to_seconds('5m') == 300 # 5 * 60 - assert plugin._convert_to_seconds('1m') == 60 - assert plugin._convert_to_seconds('10m') == 600 - # Test hours - assert plugin._convert_to_seconds('1h') == 3600 # 1 * 3600 - assert plugin._convert_to_seconds('2h') == 7200 # 2 * 3600 - assert plugin._convert_to_seconds('24h') == 86400 # 24 * 3600 - # Test days - assert plugin._convert_to_seconds('1d') == 86400 # 1 * 86400 - assert plugin._convert_to_seconds('7d') == 604800 # 7 * 86400 - assert plugin._convert_to_seconds('30d') == 2592000 # 30 * 86400 - # Test weeks - assert plugin._convert_to_seconds('1w') == 604800 # 1 * 604800 - assert plugin._convert_to_seconds('2w') == 1209600 # 2 * 604800 - assert plugin._convert_to_seconds('4w') == 2419200 # 4 * 604800 - # Test without unit - assert plugin._convert_to_seconds('30') == 30 - assert plugin._convert_to_seconds('120') == 120 - assert plugin._convert_to_seconds('0') == 10 # Default value - # Test uppercase - assert plugin._convert_to_seconds('15S') == 15 - assert plugin._convert_to_seconds('5M') == 300 - assert plugin._convert_to_seconds('1H') == 3600 - assert plugin._convert_to_seconds('1D') == 86400 - assert plugin._convert_to_seconds('1W') == 604800 - # Test invalid values - assert plugin._convert_to_seconds('invalid') == 10 # Default value - assert plugin._convert_to_seconds('xs') == 10 # Default value - assert plugin._convert_to_seconds('') == 10 # Default value - assert plugin._convert_to_seconds(None) == 10 # Default value - assert plugin._convert_to_seconds('-5s') == 10 # Default value - assert plugin._convert_to_seconds('-10') == 10 # Default value - - -# Test for enterprise user password check -@pytest.mark.django_db() -def test_can_authenticate_from_controller_enterprise_user(user, expected_log): - """ - Test that _can_authenticate_from_controller returns False for enterprise users (password != "$encrypted$"). - """ - plugin = AuthenticatorPlugin() + assert result is None - # Mock use_controller_password to True to enable controller authentication - with ( - mock.patch.object(user, 'use_controller_password', True, create=True), - mock.patch('ansible_base.authentication.authenticator_plugins.local.UserModel._default_manager.get_by_natural_key', return_value=user), - mock.patch.object(plugin, '_get_controller_user', return_value={"ldap_dn": "", "password": "regular_password"}), - expected_log('ansible_base.authentication.authenticator_plugins.local.logger', "warning", "is an enterprise user and can not be authenticated"), - ): - result = plugin._can_authenticate_from_controller(user.username, "password") - assert result is False +# ============================================================================ +# Tests for parallel execution safety +# ============================================================================ -@pytest.mark.django_db() -def test_can_authenticate_from_controller_enterprise_user_missing_password(user, expected_log): - """ - Test that _can_authenticate_from_controller returns False for enterprise users when password field is missing. - """ - plugin = AuthenticatorPlugin() - # Mock use_controller_password to True to enable controller authentication - with ( - mock.patch.object(user, 'use_controller_password', True, create=True), - mock.patch('ansible_base.authentication.authenticator_plugins.local.UserModel._default_manager.get_by_natural_key', return_value=user), - mock.patch.object(plugin, '_get_controller_user', return_value={"ldap_dn": "", "username": "testuser"}), - expected_log('ansible_base.authentication.authenticator_plugins.local.logger', "warning", "is an enterprise user and can not be authenticated"), - ): - result = plugin._can_authenticate_from_controller(user.username, "password") - assert result is False +class TestParallelExecutionSafety: + """Tests to ensure fallback authentication works correctly in parallel test execution.""" + def test_isolated_fallback_configurations(self, local_authenticator): + """Test that different authenticator instances have isolated configurations.""" + config1 = {'fallback_authentication': ['fallback.one']} + config2 = {'fallback_authentication': ['fallback.two']} -@pytest.mark.django_db() -def test_can_authenticate_from_controller_enterprise_user_none_password(user, expected_log): - """ - Test that _can_authenticate_from_controller returns False for enterprise users when password is None. - """ - plugin = AuthenticatorPlugin() + local_authenticator.configuration = config1 + local_authenticator.save() - # Mock use_controller_password to True to enable controller authentication - with ( - mock.patch.object(user, 'use_controller_password', True, create=True), - mock.patch('ansible_base.authentication.authenticator_plugins.local.UserModel._default_manager.get_by_natural_key', return_value=user), - mock.patch.object(plugin, '_get_controller_user', return_value={"ldap_dn": "", "password": None}), - expected_log('ansible_base.authentication.authenticator_plugins.local.logger', "warning", "is an enterprise user and can not be authenticated"), - ): - result = plugin._can_authenticate_from_controller(user.username, "password") - assert result is False + plugin1 = AuthenticatorPlugin(database_instance=local_authenticator) + # Simulate another worker/thread modifying the configuration + local_authenticator.configuration = config2 + local_authenticator.save() -# Test for timeout handling - if not timeout -@pytest.mark.django_db() -def test_get_controller_user_no_timeout_setting(user): - """ - Test that _get_controller_user uses default timeout when GRPC_SERVER_AUTH_SERVICE_TIMEOUT is not set. - """ - plugin = AuthenticatorPlugin() + plugin2 = AuthenticatorPlugin(database_instance=local_authenticator) - def mock_get_setting_no_timeout(setting_name): - if setting_name == 'gateway_proxy_url': - return 'http://controller.example.com' - elif setting_name == 'GRPC_SERVER_AUTH_SERVICE_TIMEOUT': - return None # No timeout setting - else: - return None + # Each plugin should read the current configuration from the database + assert plugin1.database_instance.configuration == config2 # DB was updated + assert plugin2.database_instance.configuration == config2 - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting_no_timeout): - with mock.patch('requests.get') as mock_get: - mock_response = mock.Mock() - mock_response.raise_for_status.return_value = None - mock_response.json.return_value = {"count": 1, "results": [{"ldap_dn": ""}]} - mock_get.return_value = mock_response + def test_fallback_state_not_shared(self, local_authenticator, user): + """Test that fallback authenticator state is not shared between calls.""" + local_authenticator.configuration = {'fallback_authentication': ['test.fallback.mock']} + local_authenticator.save() - result = plugin._get_controller_user(user.username, "password") + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + request = RequestFactory().post('/login') - # Verify that requests.get was called with the default timeout of 10 - mock_get.assert_called_once_with('http://controller.example.com/api/controller/v2/me/', auth=mock.ANY, timeout=10) # Default timeout should be 10 - assert result == {"ldap_dn": ""} + # Create two fallback instances to simulate parallel calls + fallback1 = MockFallbackAuthenticator() + fallback2 = MockFallbackAuthenticator() + call_count = [0] -@pytest.mark.django_db() -def test_get_controller_user_zero_timeout_setting(user): - """ - Test that _get_controller_user uses default timeout when GRPC_SERVER_AUTH_SERVICE_TIMEOUT converts to 0. - """ - plugin = AuthenticatorPlugin() + def mock_load(path, attr): + call_count[0] += 1 + if call_count[0] == 1: + return lambda *a, **k: fallback1 + return lambda *a, **k: fallback2 - def mock_get_setting_zero_timeout(setting_name): - if setting_name == 'gateway_proxy_url': - return 'http://controller.example.com' - elif setting_name == 'GRPC_SERVER_AUTH_SERVICE_TIMEOUT': - return '0s' # Zero timeout - else: - return None + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', side_effect=mock_load): + with mock.patch('django.contrib.auth.backends.ModelBackend.authenticate', return_value=None): + # First call + plugin._try_fallback_authenticators(request, user.username, 'password') + # Second call + plugin._try_fallback_authenticators(request, user.username, 'password') - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting_zero_timeout): - with mock.patch('requests.get') as mock_get: - mock_response = mock.Mock() - mock_response.raise_for_status.return_value = None - mock_response.json.return_value = {"count": 1, "results": [{"ldap_dn": ""}]} - mock_get.return_value = mock_response + # Both should have been called independently + assert fallback1.authenticate_called + assert fallback2.authenticate_called - result = plugin._get_controller_user(user.username, "password") - # Verify that requests.get was called with the default timeout of 10 (since 0 is falsy) - mock_get.assert_called_once_with('http://controller.example.com/api/controller/v2/me/', auth=mock.ANY, timeout=10) # Default timeout should be 10 - assert result == {"ldap_dn": ""} +# ============================================================================ +# Tests for edge cases +# ============================================================================ -@pytest.mark.django_db() -def test_get_controller_user_invalid_timeout_format(user): - """ - Test that _get_controller_user handles invalid timeout format and raises ValueError. - """ - plugin = AuthenticatorPlugin() +class TestEdgeCases: + """Tests for edge cases and boundary conditions.""" - def mock_get_setting_invalid_timeout(setting_name): - if setting_name == 'gateway_proxy_url': - return 'http://controller.example.com' - elif setting_name == 'GRPC_SERVER_AUTH_SERVICE_TIMEOUT': - return 'invalid_format' # Invalid timeout format - else: - return None - - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting_invalid_timeout): - # The ValueError should be raised by _convert_to_seconds, which will be caught in the generic exception handler - result = plugin._get_controller_user(user.username, "password") - assert result is None + def test_fallback_with_none_database_instance(self): + """Test handling when database_instance is None.""" + plugin = AuthenticatorPlugin(database_instance=None) + request = RequestFactory().post('/login') + result = plugin.authenticate(request, 'testuser', 'password') -# Test for generic exception handling -@pytest.mark.django_db() -def test_get_controller_user_generic_exception(user): - """ - Test that _get_controller_user handles generic exceptions gracefully. - """ - plugin = AuthenticatorPlugin() + assert result is None - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting): - with mock.patch('requests.get') as mock_get: - # Force a generic exception (not HTTP, Connection, Timeout, or JSON related) - mock_get.side_effect = Exception("Unexpected error") + def test_fallback_with_kwargs_passed_through(self, local_authenticator, user): + """Test that kwargs are passed through to fallback authenticators.""" + local_authenticator.configuration = {'fallback_authentication': ['test.fallback.mock']} + local_authenticator.save() - result = plugin._get_controller_user(user.username, "password") - assert result is None + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + request = RequestFactory().post('/login') + captured_kwargs = {} + class KwargsCapturingFallback: + def __init__(self, database_instance=None, configuration=None): + pass -@pytest.mark.django_db() -def test_get_controller_user_generic_exception_from_conversion(user): - """ - Test that _get_controller_user handles generic exceptions from _convert_to_seconds. - """ - plugin = AuthenticatorPlugin() + def authenticate(self, request, username, password, **kwargs): + captured_kwargs.update(kwargs) + return None - def mock_get_setting_exception(setting_name): - if setting_name == 'gateway_proxy_url': - return 'http://controller.example.com' - elif setting_name == 'GRPC_SERVER_AUTH_SERVICE_TIMEOUT': - return 'invalid_format' # This will cause ValueError in _convert_to_seconds - else: - return None + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', return_value=KwargsCapturingFallback): + with mock.patch('django.contrib.auth.backends.ModelBackend.authenticate', return_value=None): + plugin.authenticate(request, user.username, 'password', custom_param='value', another='param') - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting_exception): - result = plugin._get_controller_user(user.username, "password") - assert result is None + assert captured_kwargs['custom_param'] == 'value' + assert captured_kwargs['another'] == 'param' + def test_very_long_fallback_list(self, local_authenticator, user): + """Test handling of a very long list of fallbacks.""" + # Create a list of 20 fallback paths + fallback_list = [f'test.fallback.mock_{i}' for i in range(20)] + local_authenticator.configuration = {'fallback_authentication': fallback_list} + local_authenticator.save() -@pytest.mark.django_db() -def test_get_controller_user_generic_exception_during_urljoin(user): - """ - Test that _get_controller_user handles generic exceptions during URL joining. - """ - plugin = AuthenticatorPlugin() + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + request = RequestFactory().post('/login') - def mock_get_setting_none_url(setting_name): - if setting_name == 'gateway_proxy_url': - return None # This will cause early return - elif setting_name == 'GRPC_SERVER_AUTH_SERVICE_TIMEOUT': - return '30s' - else: - return None + # Make the last one succeed + call_count = [0] - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting_none_url): - result = plugin._get_controller_user(user.username, "password") - assert result is None + def mock_load(path, attr): + call_count[0] += 1 + if call_count[0] == 20: # Last one + return MockSuccessfulFallback + return MockFailingFallback + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', side_effect=mock_load): + with mock.patch('django.contrib.auth.backends.ModelBackend.authenticate', return_value=None): + result = plugin._try_fallback_authenticators(request, user.username, 'password') -@pytest.mark.django_db() -def test_get_controller_user_generic_exception_with_logging(user, expected_log): - """ - Test that _get_controller_user logs generic exceptions properly. - """ - plugin = AuthenticatorPlugin() + assert result == user + assert call_count[0] == 20 # All were tried - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting): - with mock.patch('requests.get') as mock_get: - # Create a custom exception that will be caught by the generic handler - class CustomException(Exception): - pass + def test_fallback_with_special_characters_in_username(self, local_authenticator, user): + """Test fallback authentication with special characters in username.""" + local_authenticator.configuration = {'fallback_authentication': ['test.fallback.mock']} + local_authenticator.save() - mock_get.side_effect = CustomException("Custom error") + plugin = AuthenticatorPlugin(database_instance=local_authenticator) + request = RequestFactory().post('/login') + special_username = "user@example.com" + captured_username = None - with expected_log('ansible_base.authentication.authenticator_plugins.local.logger', "warning", "An unexpected error occurred"): - result = plugin._get_controller_user(user.username, "password") - assert result is None + class UsernameCapturingFallback: + def __init__(self, database_instance=None, configuration=None): + pass + def authenticate(self, request, username, password, **kwargs): + nonlocal captured_username + captured_username = username + return None -@pytest.mark.django_db() -def test_get_controller_user_timeout_conversion_exception_handling(user): - """ - Test that _get_controller_user handles timeout conversion exceptions correctly. - """ - plugin = AuthenticatorPlugin() + with mock.patch('ansible_base.authentication.authenticator_plugins.local.import_object', return_value=UsernameCapturingFallback): + with mock.patch('django.contrib.auth.backends.ModelBackend.authenticate', return_value=None): + plugin.authenticate(request, special_username, 'password') - def mock_get_setting_bad_timeout(setting_name): - if setting_name == 'gateway_proxy_url': - return 'http://controller.example.com' - elif setting_name == 'GRPC_SERVER_AUTH_SERVICE_TIMEOUT': - return 'not_a_valid_duration' - else: - return None - - with mock.patch('ansible_base.authentication.authenticator_plugins.local.get_setting', side_effect=mock_get_setting_bad_timeout): - # This should handle the ValueError from _convert_to_seconds in the generic exception handler - result = plugin._get_controller_user(user.username, "password") - assert result is None + assert captured_username == special_username diff --git a/test_app/tests/lib/utils/test_duration.py b/test_app/tests/lib/utils/test_duration.py new file mode 100644 index 000000000..4bcd7998b --- /dev/null +++ b/test_app/tests/lib/utils/test_duration.py @@ -0,0 +1,178 @@ +""" +Tests for ansible_base.lib.utils.duration module. +""" + +import pytest + +from ansible_base.lib.utils.duration import convert_to_seconds + + +@pytest.mark.parametrize( + "duration_input,expected_seconds", + [ + # Positive seconds + pytest.param("15s", 15, id="positive_15_seconds"), + pytest.param("0s", 0, id="zero_seconds"), + pytest.param("1s", 1, id="one_second"), + pytest.param("100s", 100, id="positive_100_seconds"), + # Positive minutes + pytest.param("5m", 300, id="positive_5_minutes"), + pytest.param("0m", 0, id="zero_minutes"), + pytest.param("1m", 60, id="one_minute"), + pytest.param("10m", 600, id="positive_10_minutes"), + # Positive hours + pytest.param("1h", 3600, id="one_hour"), + pytest.param("0h", 0, id="zero_hours"), + pytest.param("2h", 7200, id="positive_2_hours"), + pytest.param("24h", 86400, id="positive_24_hours"), + # Positive days + pytest.param("2d", 172800, id="positive_2_days"), + pytest.param("0d", 0, id="zero_days"), + pytest.param("1d", 86400, id="one_day"), + pytest.param("7d", 604800, id="positive_7_days"), + # Positive weeks + pytest.param("1w", 604800, id="one_week"), + pytest.param("0w", 0, id="zero_weeks"), + pytest.param("2w", 1209600, id="positive_2_weeks"), + pytest.param("4w", 2419200, id="positive_4_weeks"), + # Plain integers (treated as seconds) + pytest.param("30", 30, id="plain_integer_30"), + pytest.param("0", 0, id="plain_integer_zero"), + pytest.param("100", 100, id="plain_integer_100"), + pytest.param("1", 1, id="plain_integer_1"), + # Negative seconds + pytest.param("-5s", -5, id="negative_5_seconds"), + pytest.param("-1s", -1, id="negative_1_second"), + pytest.param("-100s", -100, id="negative_100_seconds"), + # Negative minutes + pytest.param("-5m", -300, id="negative_5_minutes"), + pytest.param("-1m", -60, id="negative_1_minute"), + pytest.param("-10m", -600, id="negative_10_minutes"), + # Negative hours + pytest.param("-1h", -3600, id="negative_1_hour"), + pytest.param("-2h", -7200, id="negative_2_hours"), + pytest.param("-24h", -86400, id="negative_24_hours"), + # Negative days + pytest.param("-1d", -86400, id="negative_1_day"), + pytest.param("-2d", -172800, id="negative_2_days"), + pytest.param("-7d", -604800, id="negative_7_days"), + # Negative weeks + pytest.param("-1w", -604800, id="negative_1_week"), + pytest.param("-2w", -1209600, id="negative_2_weeks"), + # Negative plain integers + pytest.param("-30", -30, id="negative_plain_integer_30"), + pytest.param("-1", -1, id="negative_plain_integer_1"), + pytest.param("-100", -100, id="negative_plain_integer_100"), + # Case-insensitive units + pytest.param("15S", 15, id="uppercase_S_seconds"), + pytest.param("5M", 300, id="uppercase_M_minutes"), + pytest.param("1H", 3600, id="uppercase_H_hours"), + pytest.param("2D", 172800, id="uppercase_D_days"), + pytest.param("1W", 604800, id="uppercase_W_weeks"), + # Large numbers + pytest.param("999s", 999, id="large_999_seconds"), + pytest.param("999m", 59940, id="large_999_minutes"), + pytest.param("999h", 3596400, id="large_999_hours"), + pytest.param("365d", 31536000, id="large_365_days"), + pytest.param("52w", 31449600, id="large_52_weeks"), + ], +) +def test_convert_to_seconds_valid_inputs(duration_input, expected_seconds): + """Test convert_to_seconds with valid duration strings.""" + assert convert_to_seconds(duration_input) == expected_seconds + + +@pytest.mark.parametrize( + "invalid_input,default_value,expected_result", + [ + # Invalid string inputs + pytest.param("invalid", 10, 10, id="invalid_string_with_default_10"), + pytest.param("", 10, 10, id="empty_string_with_default_10"), + pytest.param("-", 10, 10, id="lone_minus_sign_with_default_10"), + pytest.param("s", 10, 10, id="unit_only_s_with_default_10"), + pytest.param("abc", 10, 10, id="alphabetic_string_with_default_10"), + pytest.param("15x", 10, 10, id="invalid_unit_x_with_default_10"), + pytest.param("m", 10, 10, id="unit_only_m_with_default_10"), + pytest.param("12.5s", 10, 10, id="float_not_supported_with_default_10"), + pytest.param("1h30m", 10, 10, id="multiple_units_not_supported_with_default_10"), + pytest.param(None, 10, 10, id="none_input_with_default_10"), + ], +) +def test_convert_to_seconds_invalid_inputs(invalid_input, default_value, expected_result): + """Test convert_to_seconds with invalid inputs returns the specified default value.""" + assert convert_to_seconds(invalid_input, default=default_value) == expected_result + + +@pytest.mark.parametrize( + "duration_input,default_value,expected_result", + [ + # Test 1: Invalid input, no custom default → returns function default (10) + pytest.param("invalid", 10, 10, id="invalid_no_custom_default_returns_10"), + pytest.param("", 10, 10, id="empty_no_custom_default_returns_10"), + pytest.param(None, 10, 10, id="none_no_custom_default_returns_10"), + # Test 2: Invalid input, custom default → returns custom default (not 10) + pytest.param("invalid", 0, 0, id="invalid_custom_default_0"), + pytest.param("invalid", 42, 42, id="invalid_custom_default_42"), + pytest.param("invalid", 100, 100, id="invalid_custom_default_100"), + pytest.param("invalid", -1, -1, id="invalid_custom_default_negative_1"), + pytest.param("-", 42, 42, id="lone_minus_custom_default_42"), + pytest.param("s", 99, 99, id="unit_only_custom_default_99"), + # Test 3: Valid input, no custom default → returns converted value (ignores implicit 10) + pytest.param("15s", 10, 15, id="valid_15s_no_custom_default"), + pytest.param("5m", 10, 300, id="valid_5m_no_custom_default"), + pytest.param("0", 10, 0, id="valid_0_no_custom_default"), + # Test 4: Valid input, custom default → returns converted value (ignores custom default) + pytest.param("15s", 999, 15, id="valid_15s_ignores_custom_default_999"), + pytest.param("5m", 42, 300, id="valid_5m_ignores_custom_default_42"), + pytest.param("1h", 0, 3600, id="valid_1h_ignores_custom_default_0"), + pytest.param("2d", -1, 172800, id="valid_2d_ignores_custom_default_negative_1"), + pytest.param("30", 100, 30, id="valid_plain_30_ignores_custom_default_100"), + pytest.param("-5s", 999, -5, id="valid_negative_5s_ignores_custom_default_999"), + pytest.param("0s", 42, 0, id="valid_0s_ignores_custom_default_42"), + ], +) +def test_convert_to_seconds_default_behavior(duration_input, default_value, expected_result): + """ + Test all default parameter scenarios in one comprehensive test. + + This test covers four key scenarios: + 1. Invalid input with function's default (10) → returns 10 + 2. Invalid input with custom default → returns custom default + 3. Valid input with function's default (10) → returns converted value, ignores 10 + 4. Valid input with custom default → returns converted value, ignores custom default + """ + assert convert_to_seconds(duration_input, default=default_value) == expected_result + + +@pytest.mark.parametrize( + "invalid_default_value,expected_result", + [ + # Boolean defaults (bool is subclass of int in Python, so needs explicit check) + pytest.param(True, 10, id="bool_true_logs_warning_returns_10"), + pytest.param(False, 10, id="bool_false_logs_warning_returns_10"), + ], +) +def test_convert_to_seconds_invalid_default_type(invalid_default_value, expected_result, caplog): + """ + Test that non-integer default values log a warning with stack trace and use 10 instead. + + Note: Only booleans are tested here because the project uses typeguard for runtime + type checking, which prevents other invalid types (str, float, list, dict, None) + from even reaching the function. This is the expected behavior - typeguard provides + the first line of defense, and our isinstance check catches booleans (which are + technically ints in Python but should not be accepted as defaults). + + The stack_info=True in the logger call provides developers with a full stack trace + showing exactly where convert_to_seconds was called with an invalid default. + """ + import logging + + with caplog.at_level(logging.WARNING): + result = convert_to_seconds("invalid", default=invalid_default_value) + + assert result == expected_result + assert "Invalid default value" in caplog.text + assert "Must be an integer" in caplog.text + assert "Using default of 10" in caplog.text + # Verify stack trace is included + assert "Stack (most recent call last)" in caplog.text diff --git a/test_app/tests/lib/utils/test_imports.py b/test_app/tests/lib/utils/test_imports.py new file mode 100644 index 000000000..ed9014a3e --- /dev/null +++ b/test_app/tests/lib/utils/test_imports.py @@ -0,0 +1,145 @@ +""" +Tests for ansible_base.lib.utils.imports module. +""" + +import pytest + +from ansible_base.lib.utils.imports import import_object + + +@pytest.mark.parametrize( + "import_path,default_attr,should_be_callable", + [ + # Full path imports (single argument) + pytest.param("django.conf.settings", None, False, id="django_settings_object"), + pytest.param("django.utils.text.slugify", None, True, id="django_slugify_function"), + pytest.param("django.contrib.auth.models.User", None, True, id="django_user_model_class"), + pytest.param("ansible_base.lib.utils.imports.import_object", None, True, id="import_object_itself"), + # Module + attribute imports (two arguments) + pytest.param("django.conf", "settings", False, id="django_settings_split"), + pytest.param("django.utils.text", "slugify", True, id="django_slugify_split"), + pytest.param("django.contrib.auth.models", "User", True, id="django_user_model_split"), + pytest.param("ansible_base.lib.utils.imports", "import_object", True, id="import_object_split"), + ], +) +def test_import_object_valid_imports(import_path, default_attr, should_be_callable): + """Test import_object successfully imports various objects.""" + result = import_object(import_path, default_attr) + assert result is not None + if should_be_callable: + assert callable(result) + + +@pytest.mark.parametrize( + "import_path,default_attr,exception_type,error_message_contains", + [ + # Invalid full paths + pytest.param( + "nonexistent_module.SomeClass", + None, + ImportError, + "No module named 'nonexistent_module'", + id="nonexistent_module_full_path", + ), + pytest.param( + "django.conf.NonExistentSetting", + None, + AttributeError, + "has no attribute 'NonExistentSetting'", + id="nonexistent_attribute_full_path", + ), + pytest.param( + "nodots", + None, + ValueError, + "Invalid import path", + id="no_dots_in_path", + ), + pytest.param( + "", + None, + ValueError, + "Invalid import path", + id="empty_string_full_path", + ), + # Invalid split imports + pytest.param( + "nonexistent_module", + "SomeClass", + ImportError, + "No module named 'nonexistent_module'", + id="nonexistent_module_split", + ), + pytest.param( + "django.conf", + "NonExistentSetting", + AttributeError, + "has no attribute 'NonExistentSetting'", + id="nonexistent_attribute_split", + ), + ], +) +def test_import_object_invalid_imports(import_path, default_attr, exception_type, error_message_contains): + """Test import_object raises appropriate exceptions for invalid imports.""" + with pytest.raises(exception_type) as exc_info: + import_object(import_path, default_attr) + assert error_message_contains in str(exc_info.value) + + +@pytest.mark.parametrize( + "module_path,attr_name,should_be_callable", + [ + pytest.param("os", "path", False, id="os_path_module"), + pytest.param("collections", "OrderedDict", True, id="ordereddict_class"), + pytest.param("json", "dumps", True, id="json_dumps_function"), + ], +) +def test_import_object_standard_library(module_path, attr_name, should_be_callable): + """Test import_object works with Python standard library.""" + result = import_object(module_path, attr_name) + assert result is not None + if should_be_callable: + assert callable(result) + + +def test_import_object_same_result_both_formats(): + """Test that both invocation formats return the same object.""" + # Import using full path + result1 = import_object("django.utils.text.slugify") + + # Import using module + attribute + result2 = import_object("django.utils.text", "slugify") + + # Should be the exact same function object + assert result1 is result2 + + +def test_import_object_callable_result(): + """Test that imported functions are actually callable.""" + slugify = import_object("django.utils.text.slugify") + result = slugify("Hello World") + assert result == "hello-world" + + +def test_import_object_class_instantiation(): + """Test that imported classes can be instantiated.""" + OrderedDict = import_object("collections", "OrderedDict") + instance = OrderedDict() + assert isinstance(instance, dict) + + +@pytest.mark.parametrize( + "import_path,default_attr", + [ + pytest.param("django.conf.settings", None, id="settings_full_path"), + pytest.param("django.conf", "settings", id="settings_split_path"), + ], +) +def test_import_object_consistency(import_path, default_attr): + """Test that import_object returns consistent results across multiple calls.""" + result1 = import_object(import_path, default_attr) + result2 = import_object(import_path, default_attr) + result3 = import_object(import_path, default_attr) + + # All three should be the exact same object + assert result1 is result2 is result3 diff --git a/test_app/tests/lib/utils/test_views.py b/test_app/tests/lib/utils/test_views.py index a2bb12539..13f6d4e8d 100644 --- a/test_app/tests/lib/utils/test_views.py +++ b/test_app/tests/lib/utils/test_views.py @@ -70,7 +70,7 @@ def test_ansible_base_view_parent_view(caplog, setting, log_message, default_par def test_ansible_base_view_parent_view_exception(caplog): with override_settings(ANSIBLE_BASE_CUSTOM_VIEW_PARENT='does.not.exist'): with caplog.at_level(logging.ERROR): - with mock.patch('ansible_base.lib.utils.settings.get_from_import', side_effect=ImportError("Test Exception")): + with mock.patch('ansible_base.lib.utils.imports.import_object', side_effect=ImportError("Test Exception")): import ansible_base.lib.utils.views.django_app_api importlib.reload(ansible_base.lib.utils.views.django_app_api)