From ed18a7a98b3a045dc87fd14cb77b3c68875a25c9 Mon Sep 17 00:00:00 2001 From: Josh Liburdi Date: Sun, 15 Feb 2026 14:44:53 -0800 Subject: [PATCH] feat: Add SOAR MCP Case Management Tools Add five new case management tools to the SOAR MCP server: - close_case: Close a case with root cause, reason, and comment - assign_case: Assign a user to a case - change_case_stage: Update the workflow stage of a case - add_case_tag: Add a tag to a case - remove_case_tag: Remove a tag from a case --- docs/servers/secops_soar_mcp.md | 123 ++++++ extensions/google-secops/TOOL_MAPPING.md | 6 +- .../secops_soar_mcp/case_management.py | 393 ++++++++++++++++++ .../secops_soar_mcp/utils/consts.py | 7 + 4 files changed, 528 insertions(+), 1 deletion(-) diff --git a/docs/servers/secops_soar_mcp.md b/docs/servers/secops_soar_mcp.md index 0c2e1ac9..34c83f24 100644 --- a/docs/servers/secops_soar_mcp.md +++ b/docs/servers/secops_soar_mcp.md @@ -123,6 +123,27 @@ The server supports two authentication methods: These tools are always available. +- **`create_case(name, priority=None, description=None, environment=None)`** + - **Description:** Creates a new manual case in the SOAR platform for tracking a security incident or investigation. + - **Parameters:** + - `name` (required): The name or title of the case. + - `priority` (optional): The priority level (e.g., "PriorityInfo", "PriorityLow", "PriorityMedium", "PriorityHigh", "PriorityCritical"). + - `description` (optional): A description providing context about the security incident. + - `environment` (optional): The environment for the case. + - **Returns:** The newly created case details including its assigned ID. + - **Return Example:** + ```json + { + "id": 12350, + "identifier": "SEC-12350", + "name": "Suspicious Login Attempts from External IP", + "description": "Multiple failed login attempts detected from IP 203.0.113.50", + "priority": "PriorityHigh", + "status": "New", + "creationTime": "2023-09-15T16:00:00Z" + } + ``` + - **`list_cases()`** - **Description:** Lists available cases in the SOAR platform. - **Returns:** A list of cases with basic information like ID, name, status, and priority. @@ -270,6 +291,108 @@ These tools are always available. } ``` +- **`update_case_description(case_id, description)`** + - **Description:** Updates the description of a specific case. This replaces the existing description entirely. + - **Parameters:** + - `case_id` (required): The ID of the case. + - `description` (required): The new description text for the case. + - **Returns:** Confirmation of the description update. + - **Return Example:** + ```json + { + "success": true, + "case_id": 12345, + "description": "Confirmed phishing campaign targeting finance department. Three users clicked malicious links.", + "timestamp": "2023-09-15T15:45:30Z" + } + ``` + +- **`close_case(case_id, root_cause, comment, reason, tags=None)`** + - **Description:** Closes a specific case by setting its root cause, close reason, and a closing comment. Marks the end of the investigation lifecycle. + - **Parameters:** + - `case_id` (required): The ID of the case. + - `root_cause` (required): The determined root cause of the incident. + - `comment` (required): A comment explaining the closure decision. + - `reason` (required): The close reason category. Must be one of: "Malicious", "NotMalicious", "Maintenance", "Inconclusive". + - `tags` (optional): Comma-separated tags to apply to the case when closing. + - **Returns:** Confirmation of the case closure. + - **Return Example:** + ```json + { + "success": true, + "case_id": 12345, + "status": "Closed", + "reason": "Malicious", + "root_cause": "Phishing email with credential harvester", + "timestamp": "2023-09-15T16:00:00Z" + } + ``` + +- **`assign_case(case_id, user)`** + - **Description:** Assigns a specific analyst or responder to a specific case, establishing ownership for the investigation. + - **Parameters:** + - `case_id` (required): The ID of the case. + - `user` (required): The username or email of the user to assign. + - **Returns:** Confirmation of the assignment. + - **Return Example:** + ```json + { + "success": true, + "case_id": 12345, + "assigned_user": "john.analyst@example.com", + "timestamp": "2023-09-15T14:35:00Z" + } + ``` + +- **`change_case_stage(case_id, stage)`** + - **Description:** Updates the workflow stage of a specific case to reflect its current position in the incident response lifecycle. Stages are configurable per SOAR deployment. + - **Parameters:** + - `case_id` (required): The ID of the case. + - `stage` (required): The new stage name (e.g., "Triage", "Investigation", "Containment", "Remediation"). + - **Returns:** Confirmation of the stage change. + - **Return Example:** + ```json + { + "success": true, + "case_id": 12345, + "stage": "Investigation", + "timestamp": "2023-09-15T14:40:00Z" + } + ``` + +- **`add_case_tag(case_id, tag)`** + - **Description:** Adds a descriptive tag to a specific case for categorization, filtering, and reporting purposes. + - **Parameters:** + - `case_id` (required): The ID of the case. + - `tag` (required): The tag to add (e.g., "phishing", "ransomware"). + - **Returns:** Confirmation of the tag addition. + - **Return Example:** + ```json + { + "success": true, + "case_id": 12345, + "tag": "phishing", + "timestamp": "2023-09-15T14:45:00Z" + } + ``` + +- **`remove_case_tag(case_id, tag)`** + - **Description:** Removes a previously applied tag from a specific case. + - **Parameters:** + - `case_id` (required): The ID of the case. + - `tag` (required): The tag to remove. + - **Returns:** Confirmation of the tag removal. + - **Return Example:** + ```json + { + "success": true, + "case_id": 12345, + "tag": "false-positive", + "removed": true, + "timestamp": "2023-09-15T14:50:00Z" + } + ``` + - **`get_entities_by_alert_group_identifiers(case_id, alert_group_identifiers)`** - **Description:** Retrieves entities (like IPs, users, hosts) involved in one or more alert groups within a case. Can also be used to get target entities for manual actions. - **Parameters:** diff --git a/extensions/google-secops/TOOL_MAPPING.md b/extensions/google-secops/TOOL_MAPPING.md index 6158e6e3..e2b49907 100644 --- a/extensions/google-secops/TOOL_MAPPING.md +++ b/extensions/google-secops/TOOL_MAPPING.md @@ -14,7 +14,11 @@ When executing a skill, the agent should first check which tools are available i | | Get Case Details | `get_case` | `get_case_full_details` | Local `get_case_full_details` aggregates alerts/comments. Remote `get_case` fetches the case object; use `expand='tasks,tags,products'` or call `list_case_alerts`/`list_case_comments` for full context. | | | Comment on Case | `create_case_comment` | `post_case_comment` | | | | Update Case | `update_case` | `change_case_priority` | Remote tool is general (priority, status, assignee). Local tool is specific to priority. | -| | Close Case | `execute_bulk_close_case` | *(No local tool)* | Only remote tool can close cases. | +| | Close Case | `execute_bulk_close_case` | `close_case` | Remote uses bulk endpoint; local closes a single case. | +| | Assign Case | *(No direct equivalent)* | `assign_case` | Local tool assigns a user to a case. | +| | Change Case Stage | *(No direct equivalent)* | `change_case_stage` | Local tool updates the workflow stage of a case. | +| | Add Case Tag | *(No direct equivalent)* | `add_case_tag` | Local tool adds a tag to a case. | +| | Remove Case Tag | *(No direct equivalent)* | `remove_case_tag` | Local tool removes a tag from a case. | | **Alerts (SOAR)** | List Alerts for Case | `list_case_alerts` | `list_alerts_by_case` | | | | List Events for Alert | `list_connector_events` | `list_events_by_alert` | Remote tool lists "connector events". | | | List Alert Groups | *(No direct equivalent)* | `list_alert_group_identifiers_by_case` | Remote `list_case_alerts` returns alert objects which may contain grouping info. | diff --git a/server/secops-soar/secops_soar_mcp/case_management.py b/server/secops-soar/secops_soar_mcp/case_management.py index 2c338371..86400679 100644 --- a/server/secops-soar/secops_soar_mcp/case_management.py +++ b/server/secops-soar/secops_soar_mcp/case_management.py @@ -25,6 +25,88 @@ def register_tools(mcp: FastMCP): + @mcp.tool() + async def create_case( + name: Annotated[str, Field(..., description="The name or title of the case.")], + priority: Annotated[ + Optional[str], + Field( + default=None, + description="The priority of the case.", + json_schema_extra={ + "enum": [ + "PriorityUnspecified", + "PriorityInfo", + "PriorityLow", + "PriorityMedium", + "PriorityHigh", + "PriorityCritical", + ] + }, + ), + ], + description: Annotated[ + Optional[str], + Field( + default=None, + description="A description for the case providing context about the security incident.", + ), + ], + environment: Annotated[ + Optional[str], + Field( + default=None, + description="The environment for the case.", + ), + ], + ) -> dict: + """Create a new manual case in the Security Orchestration, Automation, and Response (SOAR) platform. + + This tool creates a new case for tracking a security incident, investigation, + or other security-related activity. Manual case creation is useful when an analyst + identifies a potential security issue that has not been automatically detected + or when a case needs to be created for tracking purposes outside of automated + alert ingestion workflows. + + Args: + name (str): The name or title for the new case. Should be descriptive enough + to identify the security incident or investigation at a glance. + (Example: "Suspicious Login Attempts from External IP") + priority (Optional[str]): The priority level to assign to the case. Must be one of: + PriorityUnspecified, PriorityInfo, PriorityLow, + PriorityMedium, PriorityHigh, PriorityCritical. + If not provided, the platform default will be used. + description (Optional[str]): A detailed description providing context about the + security incident or investigation purpose. + (Example: "Multiple failed login attempts detected from IP 203.0.113.50") + environment (Optional[str]): The environment in which this case should be created. + (Example: "Default Environment") + + Returns: + dict: A dictionary representing the raw API response from the SOAR platform, + containing the newly created case details including its assigned ID, + name, priority, and other metadata. + + **Workflow Integration:** + - Use when an analyst needs to manually initiate a case for a security incident + not automatically captured by detection rules or alert ingestion. + - Useful for tracking ad-hoc investigations, threat hunts, or external incident reports. + + **Next Steps (using MCP-enabled tools):** + - Use `post_case_comment` to add initial investigation notes or context to the new case. + - Use `change_case_priority` to adjust the priority as more information becomes available. + - Use `update_case_description` to refine the case description with additional details. + - Use `get_case_full_details` to verify the case was created correctly and review its full state. + """ + req = {"Name": name} + if priority is not None: + req["Priority"] = priority + if description is not None: + req["Description"] = description + if environment is not None: + req["Environment"] = environment + return await bindings.http_client.post(Endpoints.BASE_CASE_URL, req=req) + @mcp.tool() async def list_cases( next_page_token: Annotated[ @@ -312,6 +394,317 @@ async def change_case_priority( req={"Priority": case_priority}, ) + @mcp.tool() + async def update_case_description( + case_id: Annotated[str, Field(..., description="The ID of the case.")], + description: Annotated[ + str, + Field( + ..., + description="The new description for the case.", + ), + ], + ): + """Update the description of a specific case in the SOAR platform. + + This tool replaces the existing description of a case with a new one. Case + descriptions provide important context about the security incident being + investigated. Updating the description is useful as an investigation progresses + and more details become available, ensuring the case summary accurately reflects + the current understanding of the incident. + + Note: This operation replaces the entire description. The previous description + will be overwritten and not appended to. + + Args: + case_id (str): The unique identifier (ID) of the case whose description + needs to be updated. (Example: "523") + description (str): The new description text for the case. This will replace + the existing description entirely. + (Example: "Confirmed phishing campaign targeting finance department. Three users clicked malicious links.") + + Returns: + dict: A dictionary representing the raw API response from the SOAR platform, + usually confirming the successful update of the case description or + indicating any errors encountered. + + **Workflow Integration:** + - Use during or after an investigation to update the case description with + the latest findings, conclusions, or context gathered from various sources + (SIEM, TI, EDR, etc.). + - Helps maintain an accurate and up-to-date summary of the incident for other + analysts or stakeholders reviewing the case. + + **Next Steps (using MCP-enabled tools):** + - Use `post_case_comment` to document the reason for the description update. + - Use `get_case_full_details` to verify the description was updated correctly. + - Continue the investigation using other available tools. + """ + return await bindings.http_client.post( + Endpoints.CHANGE_CASE_DESCRIPTION, + req={"CaseId": case_id, "Description": description}, + ) + + @mcp.tool() + async def close_case( + case_id: Annotated[str, Field(..., description="The ID of the case.")], + root_cause: Annotated[ + str, Field(..., description="The root cause of the case.") + ], + comment: Annotated[ + str, + Field( + ..., + description="A comment explaining why the case is being closed.", + ), + ], + reason: Annotated[ + str, + Field( + ..., + description="The close reason for the case.", + json_schema_extra={ + "enum": [ + "Malicious", + "NotMalicious", + "Maintenance", + "Inconclusive", + ] + }, + ), + ], + tags: Annotated[ + Optional[str], + Field( + default=None, + description="Optional comma-separated tags to apply to the case when closing.", + ), + ], + ): + """Close a specific case in the SOAR platform. + + Closing a case (with a reason such as Malicious, NotMalicious, Maintenance, or + Inconclusive) marks the end of the investigation lifecycle and records the final + determination. This is a critical step for metrics, reporting, and ensuring the + incident queue reflects only active investigations. The case closure requires + a root cause, a comment explaining the decision, and a close reason category. + + Args: + case_id (str): The unique identifier (ID) of the case whose investigation + has concluded and needs to be closed. (Example: "523") + root_cause (str): The determined root cause of the incident being closed. + (Example: "Phishing email with credential harvester") + comment (str): A comment explaining the closure decision and any final notes + for the case record. + (Example: "Confirmed phishing. Credentials reset, mailbox rules cleaned.") + reason (str): The close reason category for the case. Must be one of: + Malicious, NotMalicious, Maintenance, Inconclusive. + tags (Optional[str]): Comma-separated tags to apply to the case when closing. + (Example: "phishing,credential-theft") + + Returns: + dict: A dictionary representing the raw API response from the SOAR platform, + usually confirming the successful closure of the case or indicating + any errors encountered. + + **Workflow Integration:** + - Use at the end of an investigation when a final determination has been reached + based on findings from connected MCP tools (SIEM, TI, EDR, etc.). + - Ensures the case is properly documented and removed from the active queue. + + **Next Steps (using MCP-enabled tools):** + - Use `list_cases` to verify the case no longer appears in the active case list. + - Use `get_case_full_details` to confirm the closure details were recorded correctly. + - Document any remaining findings or remediation steps using a case commenting tool. + """ + req = { + "CaseId": case_id, + "RootCause": root_cause, + "Comment": comment, + "Reason": reason, + } + if tags is not None: + req["Tags"] = tags + return await bindings.http_client.post(Endpoints.CLOSE_CASE, req=req) + + @mcp.tool() + async def assign_case( + case_id: Annotated[str, Field(..., description="The ID of the case.")], + user: Annotated[ + str, + Field( + ..., + description="The username or email of the user to assign to the case.", + ), + ], + ): + """Assign a user to a specific case in the SOAR platform. + + Case assignment establishes ownership for the investigation, which is essential + for workload distribution, accountability, and ensuring every incident has a + responsible investigator. The assigned user becomes the primary analyst + responsible for driving the investigation forward. + + Args: + case_id (str): The unique identifier (ID) of the case to which a user + should be assigned. (Example: "523") + user (str): The username or email of the analyst to assign to the case. + (Example: "john.analyst@example.com") + + Returns: + dict: A dictionary representing the raw API response from the SOAR platform, + usually confirming the successful assignment of the user to the case + or indicating any errors encountered. + + **Workflow Integration:** + - Use during triage to assign unowned cases to available analysts based on + findings from connected MCP tools (SIEM, TI, EDR, etc.). + - Use to reassign cases when workload balancing or escalation is needed. + + **Next Steps (using MCP-enabled tools):** + - Document the reason for the assignment using a case commenting tool. + - Use `get_case_full_details` to verify the assignment was applied correctly. + - Adjust investigation efforts based on the assigned analyst's expertise. + """ + return await bindings.http_client.post( + Endpoints.ASSIGN_USER_TO_CASE, + req={"CaseId": case_id, "User": user}, + ) + + @mcp.tool() + async def change_case_stage( + case_id: Annotated[str, Field(..., description="The ID of the case.")], + stage: Annotated[ + str, + Field( + ..., + description="The new stage for the case. Stages are configurable per deployment (e.g., 'Triage', 'Investigation', 'Containment', 'Remediation').", + ), + ], + ): + """Change the workflow stage of a specific case in the SOAR platform. + + Case stages (e.g., Triage, Investigation, Containment, Eradication, Recovery) + represent the current position in the incident response lifecycle. Stages are + configurable per SOAR deployment, so the available stage names depend on the + platform configuration. Updating the stage helps track investigation progress + and can trigger stage-specific playbooks or notifications. + + Args: + case_id (str): The unique identifier (ID) of the case whose stage needs + to be updated. (Example: "523") + stage (str): The new stage name to set for the case. Must match a stage + configured in the SOAR deployment. + (Example: "Investigation") + + Returns: + dict: A dictionary representing the raw API response from the SOAR platform, + usually confirming the successful update of the case stage or + indicating any errors encountered. + + **Workflow Integration:** + - Use as an investigation progresses through different phases of the incident + response lifecycle, informed by findings from connected MCP tools (SIEM, TI, EDR, etc.). + - Stage changes may trigger automated playbooks or notifications configured in + the SOAR platform. + + **Next Steps (using MCP-enabled tools):** + - Document the reason for the stage transition using a case commenting tool. + - Adjust investigation efforts based on the new stage requirements. + """ + return await bindings.http_client.post( + Endpoints.CHANGE_CASE_STAGE, + req={"CaseId": case_id, "Stage": stage}, + ) + + @mcp.tool() + async def add_case_tag( + case_id: Annotated[str, Field(..., description="The ID of the case.")], + tag: Annotated[ + str, + Field( + ..., + description="The tag to add to the case.", + ), + ], + ): + """Add a tag to a specific case in the SOAR platform. + + Tags (e.g., "phishing", "ransomware", "insider-threat") are descriptive labels + applied to cases for categorization, filtering, and reporting purposes. Adding + tags helps analysts quickly identify case themes and enables efficient filtering + across the case queue. Tags might be applied based on findings from the + investigation or during initial triage. + + Args: + case_id (str): The unique identifier (ID) of the case to which a tag + should be added. (Example: "523") + tag (str): The tag string to add to the case. + (Example: "phishing") + + Returns: + dict: A dictionary representing the raw API response from the SOAR platform, + usually confirming the successful addition of the tag to the case + or indicating any errors encountered. + + **Workflow Integration:** + - Use during triage or investigation to categorize cases by threat type, + campaign, affected business unit, or any other relevant dimension based on + findings from connected MCP tools (SIEM, TI, EDR, etc.). + - Tags can be used to filter cases in dashboards and reports. + + **Next Steps (using MCP-enabled tools):** + - Document the reason for tagging using a case commenting tool. + - Use `get_case_full_details` to verify the tag was applied correctly. + """ + return await bindings.http_client.post( + Endpoints.ADD_CASE_TAG, + req={"CaseId": case_id, "Tag": tag}, + ) + + @mcp.tool() + async def remove_case_tag( + case_id: Annotated[str, Field(..., description="The ID of the case.")], + tag: Annotated[ + str, + Field( + ..., + description="The tag to remove from the case.", + ), + ], + ): + """Remove a tag from a specific case in the SOAR platform. + + Tags are descriptive labels applied to cases for categorization and filtering. + Removing a tag is useful when a tag was applied in error, when the case + categorization changes during investigation based on new findings, or when + cleaning up tags after an investigation concludes. + + Args: + case_id (str): The unique identifier (ID) of the case from which a tag + should be removed. (Example: "523") + tag (str): The tag string to remove from the case. + (Example: "false-positive") + + Returns: + dict: A dictionary representing the raw API response from the SOAR platform, + usually confirming the successful removal of the tag from the case + or indicating any errors encountered. + + **Workflow Integration:** + - Use when a case's categorization changes during investigation based on new + information from connected MCP tools (SIEM, TI, EDR, etc.). + - Use to correct mistakenly applied tags. + + **Next Steps (using MCP-enabled tools):** + - Document the reason for the tag removal using a case commenting tool. + - Use `add_case_tag` to apply a corrected tag if needed. + """ + return await bindings.http_client.post( + Endpoints.REMOVE_CASE_TAG, + req={"CaseId": case_id, "Tag": tag}, + ) + @mcp.tool() async def get_entities_by_alert_group_identifiers( case_id: Annotated[str, Field(..., description="The ID of the case.")], diff --git a/server/secops-soar/secops_soar_mcp/utils/consts.py b/server/secops-soar/secops_soar_mcp/utils/consts.py index 3ff9448c..ae0df312 100644 --- a/server/secops-soar/secops_soar_mcp/utils/consts.py +++ b/server/secops-soar/secops_soar_mcp/utils/consts.py @@ -36,6 +36,13 @@ class Endpoints: GET_ALERT_GROUP_IDENTIFIERS_ENTITIES = ( "/api/external/v1/case-overview/GetAlertsEntities" ) + BASE_LEGACY_CASE_URL = "/api/external/v1/cases" + CHANGE_CASE_DESCRIPTION = BASE_LEGACY_CASE_URL + "/ChangeCaseDescription" + CLOSE_CASE = BASE_LEGACY_CASE_URL + "/CloseCase" + ASSIGN_USER_TO_CASE = BASE_LEGACY_CASE_URL + "/AssignUserToCase" + ADD_CASE_TAG = BASE_LEGACY_CASE_URL + "/AddCaseTag" + REMOVE_CASE_TAG = BASE_LEGACY_CASE_URL + "/RemoveCaseTag" + CHANGE_CASE_STAGE = BASE_LEGACY_CASE_URL + "/ChangeCaseStage" # the below endpoint uses 'alerts' instead of 'caseAlerts', because of a bug that was introduced to the SOAR server LIST_INVOLVED_EVENTS_BY_ALERT = ( "/api/1p/external/v1.0/cases/{CASE_ID}/alerts/{ALERT_ID}/involvedEvents"