From f75ee92951c66ae8f82aa780d7d41360531c3e43 Mon Sep 17 00:00:00 2001 From: Marcos Machado Date: Mon, 6 Apr 2026 15:13:21 -0700 Subject: [PATCH] feat(scc): add organization-level findings support Add optional organization_id parameter to top_vulnerability_findings and get_finding_remediation tools, enabling queries across all projects in an organization. Most SCC Enterprise customers manage findings at the org level rather than per-project. Closes #248 --- server/scc/scc_mcp.py | 41 +++++++++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/server/scc/scc_mcp.py b/server/scc/scc_mcp.py index 7a7cc123..5e8996c3 100644 --- a/server/scc/scc_mcp.py +++ b/server/scc/scc_mcp.py @@ -72,22 +72,29 @@ def proto_message_to_dict(message: Any) -> Dict[str, Any]: @mcp.tool() async def top_vulnerability_findings( - project_id: str, + project_id: str = None, + organization_id: str = None, max_findings: int = 20, ) -> Dict[str, Any]: """Name: top_vulnerability_findings - Description: Lists the top ACTIVE, HIGH or CRITICAL severity findings of class VULNERABILITY for a specific project, - sorted by Attack Exposure Score (descending). Includes the Attack Exposure score in the output if available. - Aids prioritization for remediation. + Description: Lists the top ACTIVE, HIGH or CRITICAL severity findings of class VULNERABILITY for a specific project + or organization, sorted by Attack Exposure Score (descending). Includes the Attack Exposure score in the + output if available. Aids prioritization for remediation. Parameters: - project_id (required): The Google Cloud project ID (e.g., 'my-gcp-project'). + project_id (optional): The Google Cloud project ID (e.g., 'my-gcp-project'). Either project_id or organization_id must be provided. + organization_id (optional): The Google Cloud organization ID (e.g., '123456789'). When provided, queries findings across all projects in the organization. max_findings (optional): The maximum number of findings to return. Defaults to 20. """ if not scc_client: return {"error": "Security Center Client not initialized."} - parent = f"projects/{project_id}/sources/-" # Search across all sources in the project + if organization_id: + parent = f"organizations/{organization_id}/sources/-" + elif project_id: + parent = f"projects/{project_id}/sources/-" + else: + return {"error": "Either project_id or organization_id must be provided."} # Filter for active, high/critical vulnerability findings filter_str = 'state="ACTIVE" AND findingClass="VULNERABILITY" AND (severity="HIGH" OR severity="CRITICAL")' @@ -169,19 +176,21 @@ def sort_key(f): @mcp.tool() async def get_finding_remediation( - project_id: str, + project_id: str = None, + organization_id: str = None, resource_name: str = None, category: str = None, finding_id: str = None ) -> Dict[str, Any]: """Name: get_finding_remediation - Description: Gets the remediation steps (nextSteps) for a specific finding within a project, + Description: Gets the remediation steps (nextSteps) for a specific finding within a project or organization, along with details of the affected resource fetched from Cloud Asset Inventory (CAI). The finding can be identified either by its resource_name and category (for ACTIVE findings) or directly by its finding_id (regardless of state). Parameters: - project_id (required): The Google Cloud project ID (e.g., 'my-gcp-project'). + project_id (optional): The Google Cloud project ID (e.g., 'my-gcp-project'). Either project_id or organization_id must be provided. + organization_id (optional): The Google Cloud organization ID (e.g., '123456789'). When provided, queries findings across all projects in the organization. resource_name (optional): The full resource name associated with the finding. (e.g., '//container.googleapis.com/projects/my-project/locations/us-central1/clusters/my-cluster') category (optional): The category of the finding (e.g., 'GKE_SECURITY_BULLETIN'). @@ -196,16 +205,24 @@ async def get_finding_remediation( # Input validation if not resource_name and not category and not finding_id: return {"error": "Missing required parameters", "details": "Either resource_name and category or finding_id must be provided."} + if not project_id and not organization_id: + return {"error": "Either project_id or organization_id must be provided."} first_finding_result = None scc_error = None - parent = f"projects/{project_id}/sources/-" # Define parent once + if organization_id: + parent = f"organizations/{organization_id}/sources/-" + else: + parent = f"projects/{project_id}/sources/-" filter_str = "" # Initialize filter string try: if finding_id: # --- Use list_findings with name filter for finding_id (V1 Client) --- - finding_name_to_filter = f"projects/{project_id}/sources/-/findings/{finding_id}" + if organization_id: + finding_name_to_filter = f"organizations/{organization_id}/sources/-/findings/{finding_id}" + else: + finding_name_to_filter = f"projects/{project_id}/sources/-/findings/{finding_id}" filter_str = f'name="{finding_name_to_filter}"' logger.info(f"Attempting to list findings by name filter: {filter_str}") scc_request_args = { @@ -257,7 +274,7 @@ async def get_finding_remediation( asset_details = None if resource_name_from_finding: try: - cai_scope = f"projects/{project_id}" + cai_scope = f"organizations/{organization_id}" if organization_id else f"projects/{project_id}" cai_request = asset_v1.SearchAllResourcesRequest( scope=cai_scope, query=f'name="{resource_name_from_finding}"',