diff --git a/stytch/b2b/api/rbac_organizations.py b/stytch/b2b/api/rbac_organizations.py index 7e17a01..797425c 100644 --- a/stytch/b2b/api/rbac_organizations.py +++ b/stytch/b2b/api/rbac_organizations.py @@ -6,9 +6,13 @@ from __future__ import annotations -from typing import Any, Dict, Union +from typing import Any, Dict, Set, Union -from stytch.b2b.models.rbac import OrgPolicy +from stytch.b2b.models.rbac import ( + OrgPolicy, + Policy as B2BPolicy, + PolicyResource, +) from stytch.b2b.models.rbac_organizations import ( GetOrgPolicyResponse, SetOrgPolicyResponse, @@ -162,3 +166,45 @@ async def set_org_policy_async( ) res = await self.async_client.put(url, data, headers) return SetOrgPolicyResponse.from_json(res.response.status, res.json) + + # MANUAL(validate_org_policy)(SERVICE_METHOD) + # ADDIMPORT: from stytch.b2b.models.rbac import Policy as B2BPolicy + # ADDIMPORT: from typing import Set + @staticmethod + def validate_org_policy(project_policy: B2BPolicy, org_policy: OrgPolicy) -> None: + project_roles = set({r.role_id for r in project_policy.roles}) + project_resources = {r.resource_id: r for r in project_policy.resources} + + org_roles: Set[str] = set() + for role in org_policy.roles: + org_role_id = role.role_id + if org_role_id in org_roles: + raise Exception(f"Duplicate role {org_role_id} in Organization RBAC policy") + org_roles.add(org_role_id) + + if org_role_id in project_roles: + raise Exception(f"Role {org_role_id} already defined in Project RBAC policy") + + for permission in role.permissions: + resource_id = permission.resource_id + if not resource_id in project_resources: + raise Exception(f"Resource {resource_id} not defined in Project RBAC policy") + + if len(permission.actions) == 0: + raise Exception(f"No actions defined for role {org_role_id}, resource {resource_id}") + if len(permission.actions) == 1 and "*" == permission.actions[0]: + continue + if len(permission.actions) > 1 and "*" in permission.actions: + raise Exception("Wildcard actions must be the only action defined for a role and resource") + + project_resource = project_resources[resource_id] + for action in permission.actions: + if action.strip() == "": + raise Exception(f"Empty action on resource {resource_id} is not permitted") + + if not action in project_resource.actions: + raise Exception(f"Unknown action {action} defined on resource {resource_id}") + + return + + # ENDMANUAL(validate_org_policy) diff --git a/stytch/shared/tests/test_rbac_local.py b/stytch/shared/tests/test_rbac_local.py index a53f9cc..2c28509 100644 --- a/stytch/shared/tests/test_rbac_local.py +++ b/stytch/shared/tests/test_rbac_local.py @@ -1,7 +1,10 @@ import unittest +from stytch.b2b.api.rbac_organizations import Organizations from stytch.b2b.models.rbac import ( + OrgPolicy, Policy, + PolicyResource, PolicyRole, PolicyRolePermission, PolicyScope, @@ -376,3 +379,232 @@ def test_perform_consumer_scope_authorization_check(self) -> None: # Act perform_consumer_scope_authorization_check(self.policy, scopes, req) # Assertion is that no exception is raised + + +class TestRbacOrgPolicyValidations(unittest.TestCase): + def setUp(self) -> None: + self.sample_project_policy = Policy( + resources=[ + PolicyResource( + resource_id="document", + description="Documents", + actions=["read", "write", "delete"], + ), + PolicyResource( + resource_id="program", + description="An executable program", + actions=["read", "write", "execute"], + ), + ], + roles=[ + PolicyRole( + role_id="stytch_member", + description="member", + permissions=[ + PolicyRolePermission( + resource_id="document", + actions=["read", "write"], + ), + PolicyRolePermission( + resource_id="program", + actions=["read"], + ), + ], + ), + PolicyRole( + role_id="stytch_editor", + description="editor", + permissions=[ + PolicyRolePermission( + resource_id="document", + actions=["read", "write"], + ), + PolicyRolePermission( + resource_id="program", + actions=["read", "execute"], + ), + ], + ), + PolicyRole( + role_id="stytch_admin", + description="admin", + permissions=[ + PolicyRolePermission( + resource_id="document", + actions=["read", "write", "delete"], + ), + PolicyRolePermission( + resource_id="program", + actions=["*"], + ), + ], + ), + ], + scopes=[], # No scopes in Org RBAC policies. + ) + + def test_validate_org_rbac_policies(self) -> None: + with self.subTest("exception if a role is already defined in Project policy"): + with self.assertRaisesRegex(Exception, r"Role \w+ already defined in Project RBAC policy"): + Organizations.validate_org_policy( + project_policy=self.sample_project_policy, + org_policy=OrgPolicy( + roles=[ + PolicyRole( + role_id="stytch_editor", + description="", + permissions=[ + PolicyRolePermission(actions=["*"], resource_id="resource") + ], + ) + ] + ), + ) + + with self.subTest("exception if a role is already defined in Org policy"): + with self.assertRaisesRegex(Exception, r"Duplicate role \w+ in Organization RBAC policy"): + Organizations.validate_org_policy( + project_policy=self.sample_project_policy, + org_policy=OrgPolicy( + roles=[ + PolicyRole( + role_id="researcher", + description="", + permissions=[ + PolicyRolePermission(resource_id="document", actions=["*"]) + ], + ), + PolicyRole( + role_id="researcher", + description="", + permissions=[ + PolicyRolePermission(resource_id="document", actions=["*"]) + ], + ) + ] + ), + ) + + with self.subTest("exception if a role uses an undefined resource"): + with self.assertRaisesRegex(Exception, r"Resource \w+ not defined in Project RBAC policy"): + Organizations.validate_org_policy( + project_policy=self.sample_project_policy, + org_policy=OrgPolicy( + roles=[ + PolicyRole( + role_id="researcher", + description="", + permissions=[ + PolicyRolePermission(resource_id="computer", actions=["boot"]) + ], + ), + PolicyRole( + role_id="teacher", + description="", + permissions=[ + PolicyRolePermission(resource_id="document", actions=["*"]) + ], + ) + ] + ), + ) + + with self.subTest("exception if a role does not define actions for a permission"): + with self.assertRaisesRegex(Exception, r"No actions defined for role \w+, resource \w+"): + Organizations.validate_org_policy( + project_policy=self.sample_project_policy, + org_policy=OrgPolicy( + roles=[ + PolicyRole( + role_id="teacher", + description="", + permissions=[ + PolicyRolePermission(resource_id="document", actions=[]) + ], + ) + ] + ), + ) + + with self.subTest("exception if a role uses a wildcard with other actions"): + with self.assertRaisesRegex(Exception, + r"Wildcard actions must be the only action defined for a role and resource"): + Organizations.validate_org_policy( + project_policy=self.sample_project_policy, + org_policy=OrgPolicy( + roles=[ + PolicyRole( + role_id="teacher", + description="", + permissions=[ + PolicyRolePermission(resource_id="document", actions=["*", "read"]) + ], + ) + ] + ), + ) + + with self.subTest("exception an action is left empty"): + with self.assertRaisesRegex(Exception, r"Empty action on resource \w+ is not permitted"): + Organizations.validate_org_policy( + project_policy=self.sample_project_policy, + org_policy=OrgPolicy( + roles=[ + PolicyRole( + role_id="teacher", + description="", + permissions=[ + PolicyRolePermission(resource_id="document", actions=["", "read"]) + ], + ) + ] + ), + ) + + with self.subTest("exception if an unknown action is defined on a resource"): + with self.assertRaisesRegex(Exception, r"Unknown action \w+ defined on resource \w+"): + Organizations.validate_org_policy( + project_policy=self.sample_project_policy, + org_policy=OrgPolicy( + roles=[ + PolicyRole( + role_id="teacher", + description="", + permissions=[ + PolicyRolePermission(resource_id="document", actions=["read", "shred"]) + ], + ) + ] + ), + ) + + with self.subTest("success with a valid Org policy"): + # Assert no exception is raised. + Organizations.validate_org_policy( + project_policy=self.sample_project_policy, + org_policy=OrgPolicy( + roles=[ + PolicyRole( + role_id="teacher", + description="High school teacher", + permissions=[ + PolicyRolePermission(resource_id="document", actions=["*"]) + ], + ), + PolicyRole( + role_id="student", + description="High school student", + permissions=[ + PolicyRolePermission(resource_id="document", actions=["read"]) + ], + ), + PolicyRole( + role_id="sys_admin", + description="Network administrator", + permissions=[ + PolicyRolePermission(resource_id="program", actions=["read", "write", "execute"]) + ], + ) + ] + ), + )