Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions server/scc/scc_mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,29 @@ def proto_message_to_dict(message: Any) -> Dict[str, Any]:

@mcp.tool()
async def top_vulnerability_findings(
project_id: str,
project_id: str = None,
organization_id: str = None,
max_findings: int = 20,
) -> Dict[str, Any]:
"""Name: top_vulnerability_findings

Description: Lists the top ACTIVE, HIGH or CRITICAL severity findings of class VULNERABILITY for a specific project,
sorted by Attack Exposure Score (descending). Includes the Attack Exposure score in the output if available.
Aids prioritization for remediation.
Description: Lists the top ACTIVE, HIGH or CRITICAL severity findings of class VULNERABILITY for a specific project
or organization, sorted by Attack Exposure Score (descending). Includes the Attack Exposure score in the
output if available. Aids prioritization for remediation.
Parameters:
project_id (required): The Google Cloud project ID (e.g., 'my-gcp-project').
project_id (optional): The Google Cloud project ID (e.g., 'my-gcp-project'). Either project_id or organization_id must be provided.
organization_id (optional): The Google Cloud organization ID (e.g., '123456789'). When provided, queries findings across all projects in the organization.
max_findings (optional): The maximum number of findings to return. Defaults to 20.
"""
if not scc_client:
return {"error": "Security Center Client not initialized."}

parent = f"projects/{project_id}/sources/-" # Search across all sources in the project
if organization_id:
parent = f"organizations/{organization_id}/sources/-"
elif project_id:
parent = f"projects/{project_id}/sources/-"
else:
return {"error": "Either project_id or organization_id must be provided."}
# Filter for active, high/critical vulnerability findings
filter_str = 'state="ACTIVE" AND findingClass="VULNERABILITY" AND (severity="HIGH" OR severity="CRITICAL")'

Expand Down Expand Up @@ -169,19 +176,21 @@ def sort_key(f):

@mcp.tool()
async def get_finding_remediation(
project_id: str,
project_id: str = None,
organization_id: str = None,
resource_name: str = None,
category: str = None,
finding_id: str = None
) -> Dict[str, Any]:
"""Name: get_finding_remediation

Description: Gets the remediation steps (nextSteps) for a specific finding within a project,
Description: Gets the remediation steps (nextSteps) for a specific finding within a project or organization,
along with details of the affected resource fetched from Cloud Asset Inventory (CAI).
The finding can be identified either by its resource_name and category (for ACTIVE findings)
or directly by its finding_id (regardless of state).
Parameters:
project_id (required): The Google Cloud project ID (e.g., 'my-gcp-project').
project_id (optional): The Google Cloud project ID (e.g., 'my-gcp-project'). Either project_id or organization_id must be provided.
organization_id (optional): The Google Cloud organization ID (e.g., '123456789'). When provided, queries findings across all projects in the organization.
resource_name (optional): The full resource name associated with the finding.
(e.g., '//container.googleapis.com/projects/my-project/locations/us-central1/clusters/my-cluster')
category (optional): The category of the finding (e.g., 'GKE_SECURITY_BULLETIN').
Expand All @@ -196,16 +205,24 @@ async def get_finding_remediation(
# Input validation
if not resource_name and not category and not finding_id:
return {"error": "Missing required parameters", "details": "Either resource_name and category or finding_id must be provided."}
if not project_id and not organization_id:
return {"error": "Either project_id or organization_id must be provided."}

first_finding_result = None
scc_error = None
parent = f"projects/{project_id}/sources/-" # Define parent once
if organization_id:
parent = f"organizations/{organization_id}/sources/-"
else:
parent = f"projects/{project_id}/sources/-"
filter_str = "" # Initialize filter string

try:
if finding_id:
# --- Use list_findings with name filter for finding_id (V1 Client) ---
finding_name_to_filter = f"projects/{project_id}/sources/-/findings/{finding_id}"
if organization_id:
finding_name_to_filter = f"organizations/{organization_id}/sources/-/findings/{finding_id}"
else:
finding_name_to_filter = f"projects/{project_id}/sources/-/findings/{finding_id}"
filter_str = f'name="{finding_name_to_filter}"'
logger.info(f"Attempting to list findings by name filter: {filter_str}")
scc_request_args = {
Expand Down Expand Up @@ -257,7 +274,7 @@ async def get_finding_remediation(
asset_details = None
if resource_name_from_finding:
try:
cai_scope = f"projects/{project_id}"
cai_scope = f"organizations/{organization_id}" if organization_id else f"projects/{project_id}"
cai_request = asset_v1.SearchAllResourcesRequest(
scope=cai_scope,
query=f'name="{resource_name_from_finding}"',
Expand Down