From b02c3a81ec0f97c14c0dddfc4d6a30accba3656d Mon Sep 17 00:00:00 2001 From: Matt Dean Date: Mon, 19 May 2025 20:23:33 +0100 Subject: [PATCH 1/5] [NRL-1351] Add script to manage S3 Permissions --- scripts/manage_permissions.py | 51 +++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 scripts/manage_permissions.py diff --git a/scripts/manage_permissions.py b/scripts/manage_permissions.py new file mode 100644 index 000000000..47d9fb1c0 --- /dev/null +++ b/scripts/manage_permissions.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python + +import os + +import fire +from aws_session_assume import get_boto_session + +nrl_env = os.getenv("ENV", "dev") +nrl_auth_bucket_name = os.getenv( + "NRL_AUTH_BUCKET_NAME", f"nhsd-nrlf--{nrl_env}-authorization-store" +) + + +def _list_s3_files(file_key_prefix: str) -> list[str]: + # This function would contain the logic to list files in S3 + print(f"Listing files in S3 with prefix {file_key_prefix}...") + return [] + + +def _get_perms_from_s3(file_key: str) -> list[str]: + # This function would contain the logic to get permissions from S3 + print(f"Getting permissions from S3 for {file_key}...") + return [] + + +def list_apps() -> list[str]: + # This function would contain the logic to list apps + print("Listing all apps...") + return [] + + +def list_orgs(app_id: str) -> list[str]: + # This function would contain the logic to list organizations + print(f"Listing organizations for {app_id}...") + return [] + + +def get(app_id: str, org_ods: str) -> list[str]: + # This function would contain the logic to show current permissions + print(f"The current permissions for {app_id}/{org_ods} are:") + return [] + + +def set(app_id: str, org_ods: str, pointer_types: list[str]) -> list[str]: + # This function would contain the logic to set permissions + print(f"Setting permissions for {app_id}/{org_ods} to {pointer_types}...") + return [] + + +if __name__ == "__main__": + fire.Fire() From 818adda712c66c93421c3e6c887e063dd9642042 Mon Sep 17 00:00:00 2001 From: Matt Dean Date: Wed, 28 May 2025 14:12:23 +0100 Subject: [PATCH 2/5] [NRL-1351] Implement logic in script to help with permissions --- scripts/manage_permissions.py | 109 ++++++++++++++++++++++++++++------ 1 file changed, 90 insertions(+), 19 deletions(-) diff --git a/scripts/manage_permissions.py b/scripts/manage_permissions.py index 47d9fb1c0..e3ba8248c 100644 --- a/scripts/manage_permissions.py +++ b/scripts/manage_permissions.py @@ -1,47 +1,118 @@ #!/usr/bin/env python +import json import os import fire from aws_session_assume import get_boto_session +from nrlf.core.constants import TYPE_ATTRIBUTES + nrl_env = os.getenv("ENV", "dev") nrl_auth_bucket_name = os.getenv( "NRL_AUTH_BUCKET_NAME", f"nhsd-nrlf--{nrl_env}-authorization-store" ) +print(f"Using NRL environment: {nrl_env}") +print(f"Using NRL auth bucket: {nrl_auth_bucket_name}") +print() -def _list_s3_files(file_key_prefix: str) -> list[str]: - # This function would contain the logic to list files in S3 - print(f"Listing files in S3 with prefix {file_key_prefix}...") - return [] + +def _get_s3_client(): + boto_session = get_boto_session(nrl_env) + return boto_session.client("s3") + + +def _list_s3_keys(file_key_prefix: str) -> list[str]: + s3 = _get_s3_client() + paginator = s3.get_paginator("list_objects_v2") + + params = { + "Bucket": nrl_auth_bucket_name, + "Prefix": file_key_prefix, + } + + page_iterator = paginator.paginate(**params) + keys = [] + for page in page_iterator: + if "Contents" in page: + keys.extend([item["Key"] for item in page["Contents"]]) + + if not keys: + print(f"No files found with prefix: {file_key_prefix}") + return [] + + return keys def _get_perms_from_s3(file_key: str) -> list[str]: - # This function would contain the logic to get permissions from S3 - print(f"Getting permissions from S3 for {file_key}...") - return [] + s3 = _get_s3_client() + item = s3.get_object(Bucket=nrl_auth_bucket_name, Key=file_key) -def list_apps() -> list[str]: - # This function would contain the logic to list apps - print("Listing all apps...") - return [] + if not item: + print(f"No permissions found for {file_key}.") + return [] + return item["Body"].read().decode("utf-8") -def list_orgs(app_id: str) -> list[str]: - # This function would contain the logic to list organizations - print(f"Listing organizations for {app_id}...") - return [] +def list_apps() -> set[str]: + keys = _list_s3_keys("") + apps = set([key.split("/")[0] for key in keys]) + + if not apps: + print("No applications found in the bucket.") + return set() + + print(f"Listing all {len(apps)} apps in bucket...") + return apps + + +def list_orgs(app_id: str) -> set[str]: + keys = _list_s3_keys(f"{app_id}/") + orgs = [ + key.split("/", maxsplit=2)[1].removesuffix(".json") + for key in keys + if key and key.endswith(".json") + ] + + if not orgs: + print(f"No organizations found for app {app_id}.") + return set() + + print(f"Listing {len(orgs)} organizations for {app_id}...") + return orgs + + +def get_perms(app_id: str, org_ods: str) -> list[str]: + perms = _get_perms_from_s3(f"{app_id}/{org_ods}.json") + + if not perms: + print(f"No permissions file found for {app_id}/{org_ods}.") + return [] + + pointertype_perms = json.loads(perms) + if not pointertype_perms: + print(f"No pointer-types found in permission file for {app_id}/{org_ods}.") + return [] + + type_data = { + pointertype_perm: TYPE_ATTRIBUTES.get( + pointertype_perm, {"display": "Unknown type"} + ) + for pointertype_perm in pointertype_perms + } + types = [ + f"{type_data[pointertype_perm]['display']} ({pointertype_perm})" + for pointertype_perm in pointertype_perms + ] -def get(app_id: str, org_ods: str) -> list[str]: - # This function would contain the logic to show current permissions print(f"The current permissions for {app_id}/{org_ods} are:") - return [] + return types -def set(app_id: str, org_ods: str, pointer_types: list[str]) -> list[str]: +def set_perms(app_id: str, org_ods: str, pointer_types: list[str]) -> list[str]: # This function would contain the logic to set permissions print(f"Setting permissions for {app_id}/{org_ods} to {pointer_types}...") return [] From 4d2f591df68066a1272694cdde401b925802e112 Mon Sep 17 00:00:00 2001 From: Matt Dean Date: Wed, 28 May 2025 16:07:18 +0100 Subject: [PATCH 3/5] [NRL-1351] Add setter to manage permissions script --- scripts/manage_permissions.py | 63 ++++++++++++++++++++++++++--------- 1 file changed, 48 insertions(+), 15 deletions(-) mode change 100644 => 100755 scripts/manage_permissions.py diff --git a/scripts/manage_permissions.py b/scripts/manage_permissions.py old mode 100644 new mode 100755 index e3ba8248c..9e1dc3fbf --- a/scripts/manage_permissions.py +++ b/scripts/manage_permissions.py @@ -45,31 +45,35 @@ def _list_s3_keys(file_key_prefix: str) -> list[str]: return keys -def _get_perms_from_s3(file_key: str) -> list[str]: +def _get_perms_from_s3(file_key: str) -> str | None: s3 = _get_s3_client() - item = s3.get_object(Bucket=nrl_auth_bucket_name, Key=file_key) + try: + item = s3.get_object(Bucket=nrl_auth_bucket_name, Key=file_key) + except s3.exceptions.NoSuchKey: + print(f"Permissions file {file_key} does not exist in the bucket.") + return None - if not item: - print(f"No permissions found for {file_key}.") - return [] + if "Body" not in item: + print(f"No body found for permissions file {file_key}.") + return None return item["Body"].read().decode("utf-8") -def list_apps() -> set[str]: +def list_apps() -> list[str]: keys = _list_s3_keys("") - apps = set([key.split("/")[0] for key in keys]) + apps = [key.split("/")[0] for key in keys] if not apps: print("No applications found in the bucket.") - return set() + return [] print(f"Listing all {len(apps)} apps in bucket...") return apps -def list_orgs(app_id: str) -> set[str]: +def list_orgs(app_id: str) -> list[str]: keys = _list_s3_keys(f"{app_id}/") orgs = [ key.split("/", maxsplit=2)[1].removesuffix(".json") @@ -79,13 +83,13 @@ def list_orgs(app_id: str) -> set[str]: if not orgs: print(f"No organizations found for app {app_id}.") - return set() + return [] print(f"Listing {len(orgs)} organizations for {app_id}...") return orgs -def get_perms(app_id: str, org_ods: str) -> list[str]: +def get(app_id: str, org_ods: str) -> list[str]: perms = _get_perms_from_s3(f"{app_id}/{org_ods}.json") if not perms: @@ -112,10 +116,39 @@ def get_perms(app_id: str, org_ods: str) -> list[str]: return types -def set_perms(app_id: str, org_ods: str, pointer_types: list[str]) -> list[str]: - # This function would contain the logic to set permissions - print(f"Setting permissions for {app_id}/{org_ods} to {pointer_types}...") - return [] +def set(app_id: str, org_ods: str, *pointer_types: str) -> list[str]: + if not pointer_types: + print( + "No pointer types provided. Please specify at least one pointer type or use clear_perms command." + ) + return [] + + unknown_types = [pt for pt in pointer_types if pt not in TYPE_ATTRIBUTES] + if unknown_types: + print(f"Warning: Unknown pointer types provided: {', '.join(unknown_types)}") + print() + + permissions_content = json.dumps(pointer_types, indent=4) + s3 = _get_s3_client() + s3.put_object( + Bucket=nrl_auth_bucket_name, + Key=f"{app_id}/{org_ods}.json", + Body=permissions_content, + ContentType="application/json", + ) + + return get(app_id, org_ods) + + +def clear(app_id: str, org_ods: str) -> None: + s3 = _get_s3_client() + s3.put_object( + Bucket=nrl_auth_bucket_name, + Key=f"{app_id}/{org_ods}.json", + Body="[]", + ContentType="application/json", + ) + print(f"Cleared permissions for {app_id}/{org_ods}.") if __name__ == "__main__": From a561c935179678b45fe3a845d4932315cbbb03fa Mon Sep 17 00:00:00 2001 From: Matt Dean Date: Fri, 18 Jul 2025 13:24:41 +0100 Subject: [PATCH 4/5] [NRL-1351] Finish off CLI script to manage NRL pointer-type permissions --- scripts/manage_permissions.py | 122 ++++++++++++++++++++++++++++------ 1 file changed, 101 insertions(+), 21 deletions(-) diff --git a/scripts/manage_permissions.py b/scripts/manage_permissions.py index 9e1dc3fbf..3b5f91e28 100755 --- a/scripts/manage_permissions.py +++ b/scripts/manage_permissions.py @@ -13,8 +13,15 @@ "NRL_AUTH_BUCKET_NAME", f"nhsd-nrlf--{nrl_env}-authorization-store" ) +COMPARE_AND_CONFIRM = ( + True + if nrl_env == "prod" + else os.getenv("COMPARE_AND_CONFIRM", "false").lower() == "true" +) + print(f"Using NRL environment: {nrl_env}") print(f"Using NRL auth bucket: {nrl_auth_bucket_name}") +print(f"Compare and confirm mode: {COMPARE_AND_CONFIRM}") print() @@ -33,7 +40,7 @@ def _list_s3_keys(file_key_prefix: str) -> list[str]: } page_iterator = paginator.paginate(**params) - keys = [] + keys: list[str] = [] for page in page_iterator: if "Contents" in page: keys.extend([item["Key"] for item in page["Contents"]]) @@ -61,19 +68,26 @@ def _get_perms_from_s3(file_key: str) -> str | None: return item["Body"].read().decode("utf-8") -def list_apps() -> list[str]: +def list_apps() -> None: + """ + List all applications in the NRL environment. + """ keys = _list_s3_keys("") - apps = [key.split("/")[0] for key in keys] + apps = set([key.split("/")[0] for key in keys]) if not apps: print("No applications found in the bucket.") return [] - print(f"Listing all {len(apps)} apps in bucket...") - return apps + print(f"There are {len(apps)} apps in {nrl_env} env:") + for app in apps: + print(f"- {app}") -def list_orgs(app_id: str) -> list[str]: +def list_orgs(app_id: str) -> None: + """ + List all organizations for a specific application. + """ keys = _list_s3_keys(f"{app_id}/") orgs = [ key.split("/", maxsplit=2)[1].removesuffix(".json") @@ -83,23 +97,36 @@ def list_orgs(app_id: str) -> list[str]: if not orgs: print(f"No organizations found for app {app_id}.") - return [] - print(f"Listing {len(orgs)} organizations for {app_id}...") - return orgs + print(f"There are {len(orgs)} organizations for app {app_id}:") + for org in orgs: + print(f"- {org}") + + +def list_allowed_types() -> None: + """ + List all pointer types that can be used in permissions. + """ + print("The following pointer-types are allowed:") + + for pointer_type, attributes in TYPE_ATTRIBUTES.items(): + print(f"- %-45s (%s)" % (pointer_type, attributes["display"][:45])) -def get(app_id: str, org_ods: str) -> list[str]: +def show_perms(app_id: str, org_ods: str) -> None: + """ + Show the permissions for a specific application and organization. + """ perms = _get_perms_from_s3(f"{app_id}/{org_ods}.json") if not perms: print(f"No permissions file found for {app_id}/{org_ods}.") - return [] + return pointertype_perms = json.loads(perms) if not pointertype_perms: print(f"No pointer-types found in permission file for {app_id}/{org_ods}.") - return [] + return type_data = { pointertype_perm: TYPE_ATTRIBUTES.get( @@ -108,27 +135,63 @@ def get(app_id: str, org_ods: str) -> list[str]: for pointertype_perm in pointertype_perms } types = [ - f"{type_data[pointertype_perm]['display']} ({pointertype_perm})" + "%-45s (%s)" % (type_data[pointertype_perm]["display"][:44], pointertype_perm) for pointertype_perm in pointertype_perms ] - print(f"The current permissions for {app_id}/{org_ods} are:") - return types + print(f"{app_id}/{org_ods} is allowed to access these pointer-types:") + for type_display in types: + print(f"- {type_display}") -def set(app_id: str, org_ods: str, *pointer_types: str) -> list[str]: +def set_perms(app_id: str, org_ods: str, *pointer_types: str) -> None: + """ + Set permissions for an application and organization to access specific pointer types. + """ if not pointer_types: print( "No pointer types provided. Please specify at least one pointer type or use clear_perms command." ) - return [] + return + + if len(pointer_types) == 1 and pointer_types[0] == "all": + print("Setting permissions for access to all pointer types.") + pointer_types = tuple(TYPE_ATTRIBUTES.keys()) unknown_types = [pt for pt in pointer_types if pt not in TYPE_ATTRIBUTES] if unknown_types: - print(f"Warning: Unknown pointer types provided: {', '.join(unknown_types)}") + print(f"Error: Unknown pointer types provided: {', '.join(unknown_types)}") print() + return permissions_content = json.dumps(pointer_types, indent=4) + + if COMPARE_AND_CONFIRM: + current_perms = _get_perms_from_s3(f"{app_id}/{org_ods}.json") + if current_perms == permissions_content: + print( + f"No changes needed for {app_id}/{org_ods}. Current permissions match the new ones." + ) + return + + print() + print(f"Current permissions for {app_id}/{org_ods}:") + print(current_perms if current_perms else "No permissions set.") + + print() + print("New permissions to be set to:") + print(f"{permissions_content}") + + print() + confirm = ( + input("Do you want to proceed with these changes? (yes/NO): ") + .strip() + .lower() + ) + if confirm != "yes": + print("Operation cancelled at user request.") + return + s3 = _get_s3_client() s3.put_object( Bucket=nrl_auth_bucket_name, @@ -137,10 +200,18 @@ def set(app_id: str, org_ods: str, *pointer_types: str) -> list[str]: ContentType="application/json", ) - return get(app_id, org_ods) + print() + print(f"Set permissions for {app_id}/{org_ods}") + print() + show_perms(app_id, org_ods) -def clear(app_id: str, org_ods: str) -> None: + +def clear_perms(app_id: str, org_ods: str) -> None: + """ + Clear permissions for an application and organization. + This will remove all permissions for the specified app and org. + """ s3 = _get_s3_client() s3.put_object( Bucket=nrl_auth_bucket_name, @@ -152,4 +223,13 @@ def clear(app_id: str, org_ods: str) -> None: if __name__ == "__main__": - fire.Fire() + fire.Fire( + { + "list_apps": list_apps, + "list_orgs": list_orgs, + "list_allowed_types": list_allowed_types, + "show_perms": show_perms, + "set_perms": set_perms, + "clear_perms": clear_perms, + } + ) From 2ac7c6bfe21f6af84372c7de4200bd6da251fac3 Mon Sep 17 00:00:00 2001 From: Matt Dean Date: Mon, 21 Jul 2025 08:06:27 +0100 Subject: [PATCH 5/5] [NRL-1351] Add confirm on clearing perms. Fix sonar warnings --- scripts/manage_permissions.py | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/scripts/manage_permissions.py b/scripts/manage_permissions.py index 3b5f91e28..42198a997 100755 --- a/scripts/manage_permissions.py +++ b/scripts/manage_permissions.py @@ -73,11 +73,11 @@ def list_apps() -> None: List all applications in the NRL environment. """ keys = _list_s3_keys("") - apps = set([key.split("/")[0] for key in keys]) + apps = {key.split("/")[0] for key in keys} if not apps: print("No applications found in the bucket.") - return [] + return print(f"There are {len(apps)} apps in {nrl_env} env:") for app in apps: @@ -110,7 +110,7 @@ def list_allowed_types() -> None: print("The following pointer-types are allowed:") for pointer_type, attributes in TYPE_ATTRIBUTES.items(): - print(f"- %-45s (%s)" % (pointer_type, attributes["display"][:45])) + print("- %-45s (%s)" % (pointer_type, attributes["display"][:45])) def show_perms(app_id: str, org_ods: str) -> None: @@ -212,6 +212,28 @@ def clear_perms(app_id: str, org_ods: str) -> None: Clear permissions for an application and organization. This will remove all permissions for the specified app and org. """ + if COMPARE_AND_CONFIRM: + current_perms = _get_perms_from_s3(f"{app_id}/{org_ods}.json") + if not current_perms or current_perms == "[]": + print( + f"No need to clear permissions for {app_id}/{org_ods} as it currently has no permissions set." + ) + return + + print() + print(f"Current permissions for {app_id}/{org_ods}:") + print(current_perms) + + print() + confirm = ( + input("Are you SURE you want to clear these permissions? (yes/NO): ") + .strip() + .lower() + ) + if confirm != "yes": + print("Operation cancelled at user request.") + return + s3 = _get_s3_client() s3.put_object( Bucket=nrl_auth_bucket_name,