Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions ansible_base/authentication/utils/claims.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
from ansible_base.authentication.models import Authenticator, AuthenticatorMap, AuthenticatorUser
from ansible_base.authentication.utils.authenticator_map import check_role_type, expand_syntax
from ansible_base.lib.abstract_models import AbstractOrganization, AbstractTeam, CommonModel
from ansible_base.lib.utils.apps import is_rbac_installed
from ansible_base.lib.utils.auth import get_organization_model, get_team_model
from ansible_base.lib.utils.string import is_empty
from ansible_base.rbac.models import DABContentType
from ansible_base.rbac.remote import get_local_resource_prefix

from .trigger_definition import TRIGGER_DEFINITION

Expand Down Expand Up @@ -722,7 +721,7 @@ def reconcile_user_claims(cls, user: AbstractUser, authenticator_user: Authentic

claims = getattr(user, 'claims', authenticator_user.claims)

if 'ansible_base.rbac' in settings.INSTALLED_APPS:
if is_rbac_installed():
cls(claims, user, authenticator_user).manage_permissions()
else:
logger.info(_("Skipping user claims with RBAC roles, because RBAC app is not installed"))
Expand Down Expand Up @@ -876,7 +875,11 @@ class RoleUserAssignmentsCache:
def __init__(self):
self.cache = {}
# NOTE(cutwater): We may probably execute this query once and cache the query results.
self.content_types = {content_type.model: content_type for content_type in DABContentType.objects.get_for_models(Organization, Team).values()}
self.content_types = {}
if is_rbac_installed():
from ansible_base.rbac.models import DABContentType

self.content_types = {content_type.model: content_type for content_type in DABContentType.objects.get_for_models(Organization, Team).values()}
self.role_definitions = {}

def items(self):
Expand Down Expand Up @@ -956,6 +959,12 @@ def cache_existing(self, role_assignments: Iterable[models.Model]) -> None:
- All cached assignments are marked with STATUS_EXISTING status
- Role definitions are also cached separately in self.role_definitions
"""
local_resource_prefixes = ["shared"]
if is_rbac_installed():
from ansible_base.rbac.remote import get_local_resource_prefix

local_resource_prefixes.append(get_local_resource_prefix())

for role_assignment in role_assignments:
# Cache role definition
if (role_definition := self._rd_by_id(role_assignment)) is None:
Expand All @@ -965,7 +974,7 @@ def cache_existing(self, role_assignments: Iterable[models.Model]) -> None:
# Skip role assignments that should not be cached
if not (
role_assignment.content_type is None # Global/system roles (e.g., System Auditor)
or role_assignment.content_type.service in [get_local_resource_prefix(), "shared"]
or role_assignment.content_type.service in local_resource_prefixes
): # Local object roles
continue

Expand Down
11 changes: 6 additions & 5 deletions ansible_base/lib/routers/association_resource_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
from rest_framework.response import Response
from rest_framework.viewsets import ViewSetMixin

from ansible_base.rbac.permission_registry import permission_registry

logger = logging.getLogger('ansible_base.lib.routers.association_resource_router')


Expand Down Expand Up @@ -119,10 +117,13 @@ def check_parent_object_permissions(self, request, parent_obj: Model) -> None:
will not check "change" permissions to the parent object on POST
this method checks parent change permission, view permission should be handled by filter_queryset
"""
if (request.method not in SAFE_METHODS) and 'ansible_base.rbac' in settings.INSTALLED_APPS and permission_registry.is_registered(parent_obj):
from ansible_base.rbac.policies import check_content_obj_permission
if (request.method not in SAFE_METHODS) and 'ansible_base.rbac' in settings.INSTALLED_APPS:
from ansible_base.rbac.permission_registry import permission_registry

if permission_registry.is_registered(parent_obj):
from ansible_base.rbac.policies import check_content_obj_permission

check_content_obj_permission(request.user, parent_obj)
check_content_obj_permission(request.user, parent_obj)

def get_parent_object(self) -> Model:
"""Modeled mostly after DRF get_object, but for the parent model
Expand Down
10 changes: 6 additions & 4 deletions ansible_base/lib/testing/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ def delete_authenticator(authenticator):
class StaticResourceAPIClient(ResourceAPIClient):
"""A testing API client that reads response router attribute or static files."""

router = {}
# Route is used to force a certain status,response for a route
# It has to be a mutable default but the fixture instantiates one for
# each test.
def __init__(self, *args, **kwargs):
# Route is used to force a certain status,response for a route
# It has to be a mutable default but the fixture instantiates one for
# each test.
self.router = {}
super().__init__(*args, **kwargs)

def _make_request(self, method, path, data=None, params=None, stream=False):
response = Response()
Expand Down
4 changes: 4 additions & 0 deletions ansible_base/lib/utils/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
def is_rbac_installed() -> bool:
from django.conf import settings

return bool('ansible_base.rbac' in settings.INSTALLED_APPS)
19 changes: 12 additions & 7 deletions ansible_base/resource_registry/registry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import namedtuple
from typing import List, Optional

from django.conf import settings
from django.contrib.auth import authenticate
from django.utils.translation import gettext_lazy as _

Expand All @@ -23,12 +24,16 @@ class ServiceAPIConfig:
This will be the interface for configuring the resource registry for each service.
"""

_default_resource_processors = {
"shared.team": ResourceTypeProcessor,
"shared.organization": ResourceTypeProcessor,
"shared.user": ResourceTypeProcessor,
"shared.roledefinition": RoleDefinitionProcessor,
}
@classmethod
def _get_default_resource_processors(cls):
processors = {
"shared.team": ResourceTypeProcessor,
"shared.organization": ResourceTypeProcessor,
"shared.user": ResourceTypeProcessor,
}
if 'ansible_base.rbac' in settings.INSTALLED_APPS:
processors["shared.roledefinition"] = RoleDefinitionProcessor
return processors

custom_resource_processors = {}

Expand All @@ -43,7 +48,7 @@ def authenticate_local_user(username: str, password: str):

@classmethod
def get_processor(cls, resource_type):
combined_processors = {**cls._default_resource_processors, **cls.custom_resource_processors}
combined_processors = {**cls._get_default_resource_processors(), **cls.custom_resource_processors}
return combined_processors[resource_type]


Expand Down
11 changes: 11 additions & 0 deletions ansible_base/resource_registry/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@
import urllib3
from django.apps import apps

from ansible_base.lib.utils.apps import is_rbac_installed
from ansible_base.resource_registry.resource_server import get_resource_server_config, get_service_token


def _check_rbac_installed():
"""Check if ansible_base.rbac is installed and raise RuntimeError if not."""
if not is_rbac_installed():
raise RuntimeError("This operation requires ansible_base.rbac to be installed")


ResourceRequestBody = namedtuple(
"ResourceRequestBody",
["ansible_id", "service_id", "is_partially_migrated", "resource_type", "resource_data"],
Expand Down Expand Up @@ -186,6 +194,7 @@ def list_team_assignments(self, team_ansible_id: Optional[str] = None, filters:
return self._make_request("get", "role-team-assignments/", params=params)

def sync_assignment(self, assignment):
_check_rbac_installed()
from ansible_base.rbac.service_api.serializers import ServiceRoleTeamAssignmentSerializer, ServiceRoleUserAssignmentSerializer

if assignment._meta.model_name == 'roleuserassignment':
Expand All @@ -196,6 +205,7 @@ def sync_assignment(self, assignment):
return self._sync_assignment(serializer.data)

def sync_unassignment(self, role_definition, actor, content_object):
_check_rbac_installed()
data = {'role_definition': role_definition.name}
data[f'{actor._meta.model_name}_ansible_id'] = str(actor.resource.ansible_id)

Expand All @@ -214,6 +224,7 @@ def sync_unassignment(self, role_definition, actor, content_object):

def sync_object_deletion(self, content_object):
"""Sync object deletion to Gateway for cleanup of all related role assignments"""
_check_rbac_installed()
from ansible_base.rbac.models import DABContentType

# Get the content type information
Expand Down
26 changes: 19 additions & 7 deletions ansible_base/resource_registry/shared_types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from rest_framework import serializers
from rest_framework.exceptions import ValidationError

from ansible_base.rbac.models import DABContentType, DABPermission
from ansible_base.lib.utils.apps import is_rbac_installed
from ansible_base.resource_registry.utils.resource_type_serializers import AnsibleResourceForeignKeyField, SharedResourceTypeSerializer
from ansible_base.resource_registry.utils.sso_provider import get_sso_provider_server

Expand Down Expand Up @@ -84,6 +84,8 @@ class LenientPermissionSlugListField(serializers.ListField):
child = serializers.CharField()

def to_internal_value(self, data):
from ansible_base.rbac.models import DABPermission

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: RBAC Dependency Issue in Permission Slug Field

The LenientPermissionSlugListField.to_internal_value method imports DABPermission without checking if ansible_base.rbac is installed. This causes an ImportError when RBAC is unavailable, which is less clear than an explicit RuntimeError. While RoleDefinitionType includes an RBAC check, this field could be used independently and would fail.

Fix in Cursor Fix in Web

data = super().to_internal_value(data)
return list(DABPermission.objects.filter(api_slug__in=data))

Expand All @@ -98,14 +100,24 @@ class RoleDefinitionType(SharedResourceTypeSerializer):
name = serializers.CharField()
description = serializers.CharField(default="", allow_blank=True)
managed = serializers.BooleanField()
content_type = serializers.SlugRelatedField(
slug_field='api_slug',
queryset=DABContentType.objects.all(),
allow_null=True,
default=None,
)
permissions = LenientPermissionSlugListField()

def __init__(self, *args, **kwargs):
if not is_rbac_installed():
raise RuntimeError("RoleDefinitionType requires ansible_base.rbac to be installed")

super().__init__(*args, **kwargs)

# Set up content_type field only when rbac is available
from ansible_base.rbac.models import DABContentType

self.fields['content_type'] = serializers.SlugRelatedField(
slug_field='api_slug',
queryset=DABContentType.objects.all(),
allow_null=True,
default=None,
)

def is_valid(self, raise_exception=False):
try:
return super().is_valid(raise_exception=raise_exception)
Expand Down
26 changes: 23 additions & 3 deletions ansible_base/resource_registry/tasks/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from django.db.utils import Error, IntegrityError
from requests import HTTPError

from ansible_base.rbac.models.role import AssignmentBase, RoleDefinition, RoleTeamAssignment, RoleUserAssignment
from ansible_base.lib.utils.apps import is_rbac_installed
from ansible_base.resource_registry.models import Resource, ResourceType
from ansible_base.resource_registry.models.service_identifier import service_id
from ansible_base.resource_registry.registry import get_registry
Expand Down Expand Up @@ -139,7 +139,9 @@ def fetch_manifest(
return [ManifestItem(**row) for row in csv_reader]


def get_ansible_id_or_pk(assignment: AssignmentBase) -> str:
def get_ansible_id_or_pk(assignment) -> str:
if not is_rbac_installed():
raise RuntimeError("get_ansible_id_or_pk requires ansible_base.rbac to be installed")
# For object-scoped assignments, try to get the object's ansible_id
if assignment.content_type.model in ('organization', 'team'):
object_resource = Resource.objects.filter(object_id=assignment.object_id, content_type__model=assignment.content_type.model).first()
Expand All @@ -153,7 +155,9 @@ def get_ansible_id_or_pk(assignment: AssignmentBase) -> str:
return str(ansible_id_or_pk)


def get_content_object(role_definition: RoleDefinition, assignment_tuple: AssignmentTuple) -> Any:
def get_content_object(role_definition, assignment_tuple: AssignmentTuple) -> Any:
if not is_rbac_installed():
raise RuntimeError("get_content_object requires ansible_base.rbac to be installed")
content_object = None
if role_definition.content_type.model in ('organization', 'team'):
object_resource = Resource.objects.get(ansible_id=assignment_tuple.ansible_id_or_pk)
Expand Down Expand Up @@ -238,6 +242,10 @@ def get_remote_assignments(api_client: ResourceAPIClient) -> set[AssignmentTuple

def get_local_assignments() -> set[AssignmentTuple]:
"""Get local assignments and convert to tuples."""
if not is_rbac_installed():
raise RuntimeError("get_local_assignments requires ansible_base.rbac to be installed")
from ansible_base.rbac.models.role import RoleTeamAssignment, RoleUserAssignment

assignments = set()

# Get user assignments
Expand Down Expand Up @@ -294,6 +302,10 @@ def get_local_assignments() -> set[AssignmentTuple]:

def delete_local_assignment(assignment_tuple: AssignmentTuple) -> bool:
"""Delete a local assignment based on the tuple."""
if not is_rbac_installed():
raise RuntimeError("delete_local_assignment requires ansible_base.rbac to be installed")
from ansible_base.rbac.models.role import RoleDefinition

try:
role_definition = RoleDefinition.objects.get(name=assignment_tuple.role_definition_name)

Expand All @@ -320,6 +332,10 @@ def delete_local_assignment(assignment_tuple: AssignmentTuple) -> bool:

def create_local_assignment(assignment_tuple: AssignmentTuple) -> bool:
"""Create a local assignment based on the tuple."""
if not is_rbac_installed():
raise RuntimeError("create_local_assignment requires ansible_base.rbac to be installed")
from ansible_base.rbac.models.role import RoleDefinition

try:
role_definition = RoleDefinition.objects.get(name=assignment_tuple.role_definition_name)

Expand Down Expand Up @@ -694,6 +710,10 @@ def _sync_assignments(self):
if not self.sync_assignments:
return

if not is_rbac_installed():
self.write(">>> Skipping role assignments sync (rbac not installed)")
return

self.write(">>> Syncing role assignments")

try:
Expand Down
20 changes: 14 additions & 6 deletions test_app/resource_api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from django.conf import settings
from django.contrib.auth import get_user_model

from ansible_base.authentication.models import Authenticator
from ansible_base.rbac.models import RoleDefinition
from ansible_base.resource_registry.registry import ResourceConfig, ServiceAPIConfig, SharedResource
from ansible_base.resource_registry.shared_types import OrganizationType, RoleDefinitionType, TeamType, UserType
from ansible_base.resource_registry.shared_types import OrganizationType, TeamType, UserType
from ansible_base.resource_registry.utils.resource_type_processor import ResourceTypeProcessor
from test_app.models import Organization, Original1, Proxy2, ResourceMigrationTestModel, Team

Expand Down Expand Up @@ -38,13 +38,21 @@ class APIConfig(ServiceAPIConfig):
Organization,
shared_resource=SharedResource(serializer=OrganizationType, is_provider=False),
),
ResourceConfig(
RoleDefinition,
shared_resource=SharedResource(serializer=RoleDefinitionType, is_provider=False),
),
# Authenticators won't be a shared resource in production, but it's a convenient model to use for testing.
ResourceConfig(Authenticator),
ResourceConfig(ResourceMigrationTestModel),
ResourceConfig(Original1),
ResourceConfig(Proxy2),
]

# Conditionally add RoleDefinition if RBAC is installed
if 'ansible_base.rbac' in settings.INSTALLED_APPS:
from ansible_base.rbac.models import RoleDefinition
from ansible_base.resource_registry.shared_types import RoleDefinitionType

RESOURCE_LIST.append(
ResourceConfig(
RoleDefinition,
shared_resource=SharedResource(serializer=RoleDefinitionType, is_provider=False),
)
)
Loading