Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
3d2fae4
Refactor duration conversion utility to lib/utils module
john-westcott-iv Oct 17, 2025
6676d26
Add fallback authentication support to local authenticator
john-westcott-iv Oct 17, 2025
3002fd8
Fixing minor issues
john-westcott-iv Oct 20, 2025
e8656fa
Refactor module path validation to use regex pattern
john-westcott-iv Oct 20, 2025
3fd4266
Add comprehensive test coverage for duration.py utility
john-westcott-iv Oct 20, 2025
112616d
Optimize module path validation by compiling regex once
john-westcott-iv Oct 20, 2025
93fb36b
Refactor convert_to_seconds with cleaner implementation
john-westcott-iv Oct 20, 2025
2f1779d
[AAP-49757] Add exception logging to convert_to_seconds
john-westcott-iv Oct 20, 2025
0d8360d
[AAP-49757] Further parameterize test_duration tests to reduce duplic…
john-westcott-iv Oct 20, 2025
18806a7
[AAP-49757] Completely parameterize test_duration with descriptive IDs
john-westcott-iv Oct 20, 2025
f6a45ae
[AAP-49757] Add tests for valid inputs ignoring default parameter
john-westcott-iv Oct 20, 2025
e9ea19b
Add runtime validation for default parameter in convert_to_seconds
john-westcott-iv Oct 20, 2025
e8653b7
Create import_object utility and consolidate import logic
john-westcott-iv Oct 20, 2025
34c4b45
Fix test_local.py to use import_object instead of removed _load_fallb…
john-westcott-iv Oct 20, 2025
3aebe8c
Revert incorrect fallback parameter passing and fix test
john-westcott-iv Oct 20, 2025
06c2ef2
Use concise character class syntax '\w' in regex patterns
john-westcott-iv Oct 20, 2025
c7a435d
Refactor regex patterns to eliminate duplication
john-westcott-iv Oct 20, 2025
156d9c4
test
BrennanPaciorek Oct 23, 2025
e8da3f4
Revert "test"
BrennanPaciorek Oct 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 97 additions & 153 deletions ansible_base/authentication/authenticator_plugins/local.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import logging
from urllib.parse import urljoin

import requests
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import ModelBackend
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
from requests.auth import HTTPBasicAuth
from rest_framework import serializers
from rest_framework.serializers import ValidationError

from ansible_base.authentication.authenticator_plugins.base import AbstractAuthenticatorPlugin, BaseAuthenticatorConfiguration
from ansible_base.authentication.utils.authentication import get_or_create_authenticator_user
from ansible_base.authentication.utils.claims import update_user_claims
from ansible_base.lib.utils.settings import get_setting
from ansible_base.lib.serializers.fields import ListField
from ansible_base.lib.utils.imports import MODULE_PATH_PATTERN, import_object

logger = logging.getLogger('ansible_base.authentication.authenticator_plugins.local')

Expand All @@ -24,10 +23,42 @@
class LocalConfiguration(BaseAuthenticatorConfiguration):
documentation_url = "https://docs.djangoproject.com/en/4.2/ref/contrib/auth/#django.contrib.auth.backends.ModelBackend"

def validate(self, data):
if data != {}:
raise ValidationError(_({"configuration": "Can only be {} for local authenticators"}))
return data
fallback_authentication = ListField(
help_text=_(
'List of fallback authentication handler modules to attempt when primary authentication fails. '
'Each item should be a Python module path containing a FallbackAuthenticator class '
'(e.g., "my_app.authentication.fallbacks.my_fallback_service"). '
'The module must contain a class named "FallbackAuthenticator". '
'Fallbacks are attempted in the order specified.'
),
allow_null=True,
required=False,
default=[],
ui_field_label=_('Fallback Authentication Handlers'),
child=serializers.CharField(),
)

def validate(self, attrs):
"""
Validate the configuration and ensure fallback authenticators are valid module paths.
"""
# Call parent validation
attrs = super().validate(attrs)

# Validate fallback_authentication module paths
fallback_paths = attrs.get('fallback_authentication', [])
if fallback_paths:
errors = {}
for index, path in enumerate(fallback_paths):
if not isinstance(path, str):
errors[index] = _('Must be a string representing a Python module path')
elif not MODULE_PATH_PATTERN.match(path):
errors[index] = _('Invalid module path format. Must be a valid Python module path with at least one dot (e.g., "myapp.fallbacks.handler")')

if errors:
raise ValidationError({'fallback_authentication': errors})

return attrs


class AuthenticatorPlugin(ModelBackend, AbstractAuthenticatorPlugin):
Expand All @@ -50,25 +81,12 @@ def authenticate(self, request, username=None, password=None, **kwargs):
logger.info(f"Local authenticator {self.database_instance.name} is disabled, skipping")
return None

# Try standard ModelBackend authentication first
user = super().authenticate(request, username, password, **kwargs)
controller_login_results = None
if (
not user
and request
and request.path.startswith('/api/gateway/v1/login/')
and (controller_login_results := self._can_authenticate_from_controller(username, password))
):
logger.warning("User has been validated by controller, updating gateway user.")
self.update_gateway_user(username, password)
user = super().authenticate(request, username, password, **kwargs)
elif not user:
logger.info(
"Fallback authentication condition not met: "
f"username={username}, "
f"request={'set' if request else 'None'}, "
f"login_path={'True' if request and request.path.startswith('/api/gateway/v1/login/') else 'False'}, "
f"controller_login_results={controller_login_results}"
)

# If authentication failed, try fallback authenticators
if not user:
user = self._try_fallback_authenticators(request, username, password, **kwargs)

# This auth class doesn't create any new local users, but we still need to make sure
# it has an AuthenticatorUser associated with it.
Expand All @@ -88,137 +106,63 @@ def authenticate(self, request, username=None, password=None, **kwargs):
)
return update_user_claims(user, self.database_instance, [])

def _can_authenticate_from_controller(self, username, password):
"""
Check if a user exists in the AuthenticatorUser table with the local authenticator provider.
If the user is valid, update the gateway users credentials with the controller credentials.
def _try_fallback_authenticators(self, request, username, password, **kwargs):
"""
try:
user = UserModel._default_manager.get_by_natural_key(username)
except UserModel.DoesNotExist:
logger.warning(f"User '{username}' does not exist in the database.")
return False

# Skip controller authentication if user has use_controller_password field set to False
# Default to False when field doesn't exist (test environments)
if not getattr(user, 'use_controller_password', False):
logger.warning(f"User '{username}' password not in Controller.")
return False

if controller_user := self._get_controller_user(username, password):
# Validate controller_user has a ldap_dn field, if it is not None, then the user is a local user
ldap_dn = controller_user.get("ldap_dn")
if ldap_dn is None or ldap_dn != "":
logger.warning(f"User '{username}' is an ldap user and can not be authenticated.")
return False
if controller_user.get('password', None) != "$encrypted$":
logger.warning(f"User '{username}' is an enterprise user and can not be authenticated.")
return False
return True
else:
return False
Try each configured fallback authenticator in order.

def _get_controller_user(self, username: str, password: str):
"""
Get the user from the controller by making a request to the controller API /me/ endpoint.
If the user is not found, return None.
If the user is found, return the user.
"""
Fallback authenticators are loaded as plugins from the 'fallback_authentication' configuration
field, which should be a list of module paths. Each module must contain a class named
'FallbackAuthenticator'.

controller_base_domain = get_setting('gateway_proxy_url')
if not controller_base_domain:
logger.warning("Controller authentication failed, unable to get controller base domain")
return None
controller_url = urljoin(controller_base_domain, "/api/controller/v2/me/")

timeout = get_setting('GRPC_SERVER_AUTH_SERVICE_TIMEOUT')
timeout = self._convert_to_seconds(timeout)

try:
response = requests.get(controller_url, auth=HTTPBasicAuth(username, password), timeout=int(timeout))
response.raise_for_status()
user_data = response.json()

# Check if count exists and equals 1
count = user_data.get("count")
if count != 1:
logger.warning(f"Unable to authenticate user '{username}' with controller.")
return None

# Check if results exists and is a non-empty list
results = user_data.get("results")
if not results or not isinstance(results, list) or len(results) == 0:
logger.info(f"Unable to authenticate user '{username}' with controller. Invalid or empty results.")
return None
if not isinstance(results[0], dict):
logger.warning(f"Unable to authenticate user '{username}' with controller. user was not a dictionary.")
return False

return results[0]
except requests.exceptions.HTTPError as http_err:
logger.warning(f"HTTP error occurred: {http_err}")
return None
except requests.exceptions.ConnectionError as conn_err:
logger.warning(f"Connection error occurred: {conn_err}")
return None
except requests.exceptions.Timeout as timeout_err:
logger.warning(f"Timeout error occurred: {timeout_err}")
return None
except requests.exceptions.RequestException as err:
logger.warning(f"An unexpected error occurred: {err}")
return None
except ValueError as json_err:
logger.warning(f"JSON decode error occurred: {json_err}")
return None
except Exception as err:
logger.warning(f"An unexpected error occurred: {err}")
return None
Example:
Configuration: ['my_service.authentication.fallbacks.my_fallback_service']
Loads: my_service.authentication.fallbacks.my_fallback_service.FallbackAuthenticator

def update_gateway_user(self, username, password):
"""
Update the gateway user with the controller credentials and set is_partially_migrated to False.
"""
user = UserModel._default_manager.get_by_natural_key(username)
user.set_password(password)
Each fallback authenticator is instantiated and checked to see if it should be attempted
using its should_attempt() method. If so, its authenticate() method is called.

# Set use_controller_password to False if the field exists
update_fields = ['password']
if hasattr(user, 'use_controller_password'):
user.use_controller_password = False
update_fields.append('use_controller_password')
If a fallback authenticator returns a user, we use that user. Otherwise, we continue
to the next fallback authenticator.

user.save(update_fields=update_fields)
logger.info(f"Updated user {username} gateway account")
Args:
request: The HTTP request object
username: The username to authenticate
password: The password to authenticate
**kwargs: Additional authentication parameters

def _convert_to_seconds(self, s):
"""
Converts a time string like '15s', '5m', '1h', '2d', '3w' to seconds.
Returns:
The authenticated user object if successful, None otherwise
"""
default = 10
try:
unit = s[-1].lower()
value = int(s[:-1])

ret_val = 0
# Check units
if unit == '-':
ret_val = default
elif unit == 's':
ret_val = value
elif unit == 'm':
ret_val = value * 60
elif unit == 'h':
ret_val = value * 3600 # 60 * 60
elif unit == 'd':
ret_val = value * 86400 # 60 * 60 * 24
elif unit == 'w':
ret_val = value * 604800 # 60 * 60 * 24 * 7
else:
ret_val = int(s)
# If less than or equal to 0, return default
if ret_val <= 0:
ret_val = default
return ret_val
except Exception:
logger.warning(f"Invalid duration format: '{s}'")
return default
configuration = self.database_instance.configuration if self.database_instance else {}
fallback_paths = configuration.get('fallback_authentication', [])

for module_path in fallback_paths:
try:
# Load the fallback plugin (must contain a class named 'FallbackAuthenticator')
fallback_class = import_object(module_path, 'FallbackAuthenticator')

# Instantiate the fallback authenticator
fallback_authenticator = fallback_class()

logger.info(f"Attempting fallback authenticator: {module_path}")

# Try the fallback authentication
user = fallback_authenticator.authenticate(request, username, password, **kwargs)

# If fallback returned a user, use it
if user:
logger.info(f"Fallback authenticator {module_path} returned user {user.username}")
return user

except (ImportError, AttributeError, ValueError) as e:
logger.error(f"Failed to load fallback authenticator plugin from {module_path}: {e}")
continue
except Exception as e:
logger.error(f"Error in fallback authenticator {module_path}: {e}")
continue

if not fallback_paths:
logger.debug("No fallback authenticators configured")
else:
logger.debug("All fallback authenticators exhausted, authentication failed")
return None
7 changes: 4 additions & 3 deletions ansible_base/authentication/authenticator_plugins/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from django.db.models.fields import uuid
from django.utils.text import slugify

from ansible_base.lib.utils.imports import import_object

logger = logging.getLogger('ansible_base.authentication.authenticator_plugins.utils')
setting = 'ANSIBLE_BASE_AUTHENTICATOR_CLASS_PREFIXES'

Expand All @@ -33,9 +35,8 @@ def get_authenticator_class(authenticator_type: str):
raise ImportError("Must pass authenticator type to import")
try:
logger.debug(f"Attempting to load class {authenticator_type}")
auth_class = __import__(authenticator_type, globals(), locals(), ['AuthenticatorPlugin'], 0)
return auth_class.AuthenticatorPlugin
except Exception as e:
return import_object(authenticator_type, 'AuthenticatorPlugin')
except (ValueError, ImportError, AttributeError) as e:
logger.exception(f"The specified authenticator type {authenticator_type} could not be loaded, see exception below")
raise ImportError(f"The specified authenticator type {authenticator_type} could not be loaded") from e

Expand Down
83 changes: 83 additions & 0 deletions ansible_base/lib/utils/duration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
Utility functions for parsing and converting duration/time strings.
"""

import logging
import re
from typing import Optional

logger = logging.getLogger('ansible_base.lib.utils.duration')


DURATION_CHAR_TO_SECONDS = {
's': 1,
'm': 60,
'h': 3600,
'd': 86400,
'w': 604800,
}

DURATION_RE = re.compile(r"^(-?\d+)([smhdw]?)$")


def convert_to_seconds(duration_string: Optional[str], default: int = 10) -> int:
"""
Converts a duration string like '15s', '5m', '1h', '2d', '3w' to seconds.
This function parses duration strings and converts them to seconds. It allows
negative values, leaving validation to the caller based on their use case.
Args:
duration_string: A string representing a duration with a unit suffix.
Supported units: s (seconds), m (minutes), h (hours),
d (days), w (weeks). Can also be a plain integer string
for seconds. Negative values are supported. Case-insensitive.
default: The default value to return if the input is invalid or cannot
be parsed. Must be an integer. Defaults to 10 seconds. If a non-integer
value is provided, a warning with stack trace is logged and 10 is used instead.
Returns:
int: The duration in seconds (can be negative), or the default value if invalid.
Examples:
>>> convert_to_seconds('15s')
15
>>> convert_to_seconds('5m')
300
>>> convert_to_seconds('1h')
3600
>>> convert_to_seconds('2d')
172800
>>> convert_to_seconds('1w')
604800
>>> convert_to_seconds('30')
30
>>> convert_to_seconds('-5s')
-5
>>> convert_to_seconds('-1d')
-86400
>>> convert_to_seconds('invalid')
10
>>> convert_to_seconds('invalid', default=42)
42
>>> convert_to_seconds('invalid', default='not_an_int') # Logs warning with stack trace, returns 10
10
"""
# Validate that default is an integer (but not a boolean, which is a subclass of int in Python)
if isinstance(default, bool) or not isinstance(default, int):
logger.warning(f"Invalid default value: '{default}' (type: {type(default).__name__}). Must be an integer. Using default of 10.", stack_info=True)
default = 10

try:
if duration_string is None:
raise ValueError("Duration string is None")

if matches := DURATION_RE.match(duration_string.lower()):
number = int(matches.group(1)) # The numeric part (can be negative)
unit = matches.group(2) or 's' # The unit character, default to 's'
return number * DURATION_CHAR_TO_SECONDS[unit]
else:
raise ValueError("Invalid duration format")
except Exception as e:
logger.warning(f"Invalid duration format: '{duration_string}' ({e}), return default of {default}")
return default
Loading