Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions stytch/b2b/api/rbac_organizations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@

from __future__ import annotations

from typing import Any, Dict, Union
from typing import Any, Dict, Set, Union

from stytch.b2b.models.rbac import OrgPolicy
from stytch.b2b.models.rbac import (
OrgPolicy,
Policy as B2BPolicy,
PolicyResource,
)
from stytch.b2b.models.rbac_organizations import (
GetOrgPolicyResponse,
SetOrgPolicyResponse,
Expand Down Expand Up @@ -162,3 +166,45 @@ async def set_org_policy_async(
)
res = await self.async_client.put(url, data, headers)
return SetOrgPolicyResponse.from_json(res.response.status, res.json)

# MANUAL(validate_org_policy)(SERVICE_METHOD)
# ADDIMPORT: from stytch.b2b.models.rbac import Policy as B2BPolicy
# ADDIMPORT: from typing import Set
@staticmethod
def validate_org_policy(project_policy: B2BPolicy, org_policy: OrgPolicy) -> None:
Comment on lines +173 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[dust] Totally fine here, but there are purists that try to avoid ever writing staticmethod under the thought that it should either: [1] be a classmethod that cares about the class in some way or [2] actually belongs as a free function or in another class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to leave this as-is and risk the wrath of sectarian Python violence.

project_roles = set({r.role_id for r in project_policy.roles})
project_resources = {r.resource_id: r for r in project_policy.resources}

org_roles: Set[str] = set()
for role in org_policy.roles:
org_role_id = role.role_id
if org_role_id in org_roles:
raise Exception(f"Duplicate role {org_role_id} in Organization RBAC policy")
org_roles.add(org_role_id)

if org_role_id in project_roles:
raise Exception(f"Role {org_role_id} already defined in Project RBAC policy")

for permission in role.permissions:
resource_id = permission.resource_id
if not resource_id in project_resources:
raise Exception(f"Resource {resource_id} not defined in Project RBAC policy")

if len(permission.actions) == 0:
raise Exception(f"No actions defined for role {org_role_id}, resource {resource_id}")
if len(permission.actions) == 1 and "*" == permission.actions[0]:
continue
if len(permission.actions) > 1 and "*" in permission.actions:
raise Exception("Wildcard actions must be the only action defined for a role and resource")

project_resource = project_resources[resource_id]
for action in permission.actions:
if action.strip() == "":
raise Exception(f"Empty action on resource {resource_id} is not permitted")

if not action in project_resource.actions:
raise Exception(f"Unknown action {action} defined on resource {resource_id}")

return

# ENDMANUAL(validate_org_policy)
232 changes: 232 additions & 0 deletions stytch/shared/tests/test_rbac_local.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import unittest

from stytch.b2b.api.rbac_organizations import Organizations
from stytch.b2b.models.rbac import (
OrgPolicy,
Policy,
PolicyResource,
PolicyRole,
PolicyRolePermission,
PolicyScope,
Expand Down Expand Up @@ -376,3 +379,232 @@ def test_perform_consumer_scope_authorization_check(self) -> None:
# Act
perform_consumer_scope_authorization_check(self.policy, scopes, req)
# Assertion is that no exception is raised


class TestRbacOrgPolicyValidations(unittest.TestCase):
def setUp(self) -> None:
self.sample_project_policy = Policy(
resources=[
PolicyResource(
resource_id="document",
description="Documents",
actions=["read", "write", "delete"],
),
PolicyResource(
resource_id="program",
description="An executable program",
actions=["read", "write", "execute"],
),
],
roles=[
PolicyRole(
role_id="stytch_member",
description="member",
permissions=[
PolicyRolePermission(
resource_id="document",
actions=["read", "write"],
),
PolicyRolePermission(
resource_id="program",
actions=["read"],
),
],
),
PolicyRole(
role_id="stytch_editor",
description="editor",
permissions=[
PolicyRolePermission(
resource_id="document",
actions=["read", "write"],
),
PolicyRolePermission(
resource_id="program",
actions=["read", "execute"],
),
],
),
PolicyRole(
role_id="stytch_admin",
description="admin",
permissions=[
PolicyRolePermission(
resource_id="document",
actions=["read", "write", "delete"],
),
PolicyRolePermission(
resource_id="program",
actions=["*"],
),
],
),
],
scopes=[], # No scopes in Org RBAC policies.
)

def test_validate_org_rbac_policies(self) -> None:
with self.subTest("exception if a role is already defined in Project policy"):
with self.assertRaisesRegex(Exception, r"Role \w+ already defined in Project RBAC policy"):
Organizations.validate_org_policy(
project_policy=self.sample_project_policy,
org_policy=OrgPolicy(
roles=[
PolicyRole(
role_id="stytch_editor",
description="",
permissions=[
PolicyRolePermission(actions=["*"], resource_id="resource")
],
)
]
),
)

with self.subTest("exception if a role is already defined in Org policy"):
with self.assertRaisesRegex(Exception, r"Duplicate role \w+ in Organization RBAC policy"):
Organizations.validate_org_policy(
project_policy=self.sample_project_policy,
org_policy=OrgPolicy(
roles=[
PolicyRole(
role_id="researcher",
description="",
permissions=[
PolicyRolePermission(resource_id="document", actions=["*"])
],
),
PolicyRole(
role_id="researcher",
description="",
permissions=[
PolicyRolePermission(resource_id="document", actions=["*"])
],
)
]
),
)

with self.subTest("exception if a role uses an undefined resource"):
with self.assertRaisesRegex(Exception, r"Resource \w+ not defined in Project RBAC policy"):
Organizations.validate_org_policy(
project_policy=self.sample_project_policy,
org_policy=OrgPolicy(
roles=[
PolicyRole(
role_id="researcher",
description="",
permissions=[
PolicyRolePermission(resource_id="computer", actions=["boot"])
],
),
PolicyRole(
role_id="teacher",
description="",
permissions=[
PolicyRolePermission(resource_id="document", actions=["*"])
],
)
]
),
)

with self.subTest("exception if a role does not define actions for a permission"):
with self.assertRaisesRegex(Exception, r"No actions defined for role \w+, resource \w+"):
Organizations.validate_org_policy(
project_policy=self.sample_project_policy,
org_policy=OrgPolicy(
roles=[
PolicyRole(
role_id="teacher",
description="",
permissions=[
PolicyRolePermission(resource_id="document", actions=[])
],
)
]
),
)

with self.subTest("exception if a role uses a wildcard with other actions"):
with self.assertRaisesRegex(Exception,
r"Wildcard actions must be the only action defined for a role and resource"):
Organizations.validate_org_policy(
project_policy=self.sample_project_policy,
org_policy=OrgPolicy(
roles=[
PolicyRole(
role_id="teacher",
description="",
permissions=[
PolicyRolePermission(resource_id="document", actions=["*", "read"])
],
)
]
),
)

with self.subTest("exception an action is left empty"):
with self.assertRaisesRegex(Exception, r"Empty action on resource \w+ is not permitted"):
Organizations.validate_org_policy(
project_policy=self.sample_project_policy,
org_policy=OrgPolicy(
roles=[
PolicyRole(
role_id="teacher",
description="",
permissions=[
PolicyRolePermission(resource_id="document", actions=["", "read"])
],
)
]
),
)

with self.subTest("exception if an unknown action is defined on a resource"):
with self.assertRaisesRegex(Exception, r"Unknown action \w+ defined on resource \w+"):
Organizations.validate_org_policy(
project_policy=self.sample_project_policy,
org_policy=OrgPolicy(
roles=[
PolicyRole(
role_id="teacher",
description="",
permissions=[
PolicyRolePermission(resource_id="document", actions=["read", "shred"])
],
)
]
),
)

with self.subTest("success with a valid Org policy"):
# Assert no exception is raised.
Organizations.validate_org_policy(
project_policy=self.sample_project_policy,
org_policy=OrgPolicy(
roles=[
PolicyRole(
role_id="teacher",
description="High school teacher",
permissions=[
PolicyRolePermission(resource_id="document", actions=["*"])
],
),
PolicyRole(
role_id="student",
description="High school student",
permissions=[
PolicyRolePermission(resource_id="document", actions=["read"])
],
),
PolicyRole(
role_id="sys_admin",
description="Network administrator",
permissions=[
PolicyRolePermission(resource_id="program", actions=["read", "write", "execute"])
],
)
]
),
)