diff --git a/.gitignore b/.gitignore index a614800b..86edf58e 100644 --- a/.gitignore +++ b/.gitignore @@ -203,6 +203,9 @@ app_data.db # Don't commit the evaluation data *.evalset.json + +# Local reference docs (not for repo) +stats_doc.md .adk/ # Gemini diff --git a/server/secops/secops_mcp/tools/__init__.py b/server/secops/secops_mcp/tools/__init__.py index 1b16e316..a9875f62 100644 --- a/server/secops/secops_mcp/tools/__init__.py +++ b/server/secops/secops_mcp/tools/__init__.py @@ -25,6 +25,7 @@ from .rule_exclusions import * from .search import * from .security_alerts import * +from .stats import * from .security_events import * from .security_rules import * from .threat_intel import * diff --git a/server/secops/secops_mcp/tools/stats.py b/server/secops/secops_mcp/tools/stats.py new file mode 100644 index 00000000..59abbb25 --- /dev/null +++ b/server/secops/secops_mcp/tools/stats.py @@ -0,0 +1,210 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Security Operations MCP tools for Chronicle stats/aggregation queries.""" + +import logging +from typing import Any, Dict, Optional + +from secops_mcp.server import get_chronicle_client, server +from secops_mcp.utils import parse_time_range + +logger = logging.getLogger("secops-mcp") + + +@server.tool() +async def get_stats( + query: str, + hours_back: int = 24, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + max_values: int = 60, + max_events: int = 10000, + case_insensitive: bool = True, + timeout: int = 120, + project_id: str = None, + customer_id: str = None, + region: str = None, +) -> Dict[str, Any]: + """Run a YARA-L 2.0 stats/aggregation query against Chronicle UDM events. + + Executes a Chronicle UDM query using YARA-L 2.0 match and outcome sections, + returning aggregated results as structured columns and rows. Stats queries are + significantly faster than full event fetches because they return summary data + only — ideal for long-tail analysis, frequency counts, top-N rankings, and + time-bucketed roll-ups. + + Note: Statistical query results are available two hours after ingestion. + + **When to use get_stats vs search_udm:** + - Use `get_stats` when you need counts, aggregations, or summaries (e.g., top + source IPs, event frequency by user, distinct process names per host). + - Use `search_udm` when you need the raw event details themselves. + + **YARA-L 2.0 Query Structure:** + A stats query is composed of the following sections in order: + + 1. **Filtering statement** (required): UDM conditions to filter events. + 2. **match** (optional): Fields to group by, with optional time granularity. + Syntax: ``match:\\n field1, field2 [by|over every] [N] [time_unit]`` + 3. **outcome** (required): Aggregate expressions assigned to ``$variables``. + Syntax: ``outcome:\\n $alias = function(field)`` + 4. **order** (optional): Sort direction — ``asc`` (default) or ``desc``. + Syntax: ``order:\\n $alias desc`` + 5. **limit** (optional): Maximum rows returned. + Syntax: ``limit:\\n 20`` + + **Supported Aggregate Functions:** + - ``array(field)`` — all values as a list (truncated to 25 random elements) + - ``array_distinct(field)`` — distinct values as a list (max 25 elements) + - ``avg(numericField)`` — average, ignoring NULLs + - ``count(field)`` — number of rows in the group + - ``count_distinct(field)`` — number of distinct values in the group + - ``earliest(timestamp)`` — earliest timestamp with microsecond resolution + - ``latest(timestamp)`` — latest timestamp with microsecond resolution + - ``max(numericField)`` — maximum value in the group + - ``min(numericField)`` — minimum value in the group + - ``stddev(numericField)`` — standard deviation + - ``sum(numericField)`` — sum, ignoring NULLs + + **Time Granularity (match section):** + Group by time using ``by`` or ``over every`` with: ``minute``/``m``, + ``hour``/``h``, ``day``/``d``, ``week``/``w``, ``month``/``mo``. + Both keywords are functionally equivalent. + + **Example Queries:** + ``` + # Count of successful user logins grouped by date + metadata.event_type = "USER_LOGIN" + $security_result = security_result.action + $security_result = "ALLOW" + $date = timestamp.get_date(metadata.event_timestamp.seconds, "America/Los_Angeles") + match: + $security_result, $date + outcome: + $event_count = count(metadata.id) + + # Top 20 IPs by unique user count (OKTA logs) + metadata.log_type = "OKTA" + match: + principal.ip + outcome: + $user_count_by_ip = count(principal.user.userid) + order: + $user_count_by_ip desc + limit: + 20 + + # Event volume per hostname bucketed by day + $hostname = target.hostname + match: + $hostname over every day + outcome: + $events_count = count($hostname) + + # Total bytes sent per source IP + target.ip != "" + match: + principal.ip + outcome: + $sent_bytes = sum(network.sent_bytes) + ``` + + **Workflow Integration:** + - Use to triage alerts by frequency before deep-diving into raw events. + - Identify rare/anomalous values (long-tail) — low-count rows in ``count()``. + - Scope the blast radius of an incident (how many hosts? how many users?). + - Feed results into ``search_udm`` with specific field values for full event context. + + Args: + query (str): YARA-L 2.0 query with a filtering statement and outcome section. + The match section is optional; outcome is required for aggregation. + hours_back (int): Hours to look back from now when start_time is not given. + Defaults to 24. + start_time (Optional[str]): Start of time range in ISO 8601 format + (e.g. "2024-01-15T00:00:00Z"). Overrides hours_back. + end_time (Optional[str]): End of time range in ISO 8601 format. Defaults to now. + max_values (int): Maximum number of aggregated rows (buckets) to return. + Defaults to 60. Increase for high-cardinality group-by fields. + max_events (int): Maximum raw events scanned during aggregation. Defaults to + 10000. Increase for more complete results on busy environments. + case_insensitive (bool): Whether string comparisons are case-insensitive. + Defaults to True. + timeout (int): Request timeout in seconds. Defaults to 120. Increase for + complex queries over large time windows. + project_id (Optional[str]): Google Cloud project ID. Defaults to env config. + customer_id (Optional[str]): Chronicle customer ID. Defaults to env config. + region (Optional[str]): Chronicle region (e.g. "us", "europe"). Defaults to env config. + + Returns: + Dict[str, Any]: A dictionary with: + - "columns" (List[str]): Ordered list of column names from the outcome section. + - "rows" (List[Dict]): Each row is a dict mapping column name to its value. + Values are typed: int, float, str, datetime, list, or None. + - "total_rows" (int): Number of rows returned. + Returns {"error": str, "columns": [], "rows": [], "total_rows": 0} on failure. + + Example Output: + { + "columns": ["principal.ip", "$user_count_by_ip"], + "rows": [ + {"principal.ip": "10.1.2.3", "$user_count_by_ip": 412}, + {"principal.ip": "192.168.0.55", "$user_count_by_ip": 87} + ], + "total_rows": 2 + } + + Next Steps (using MCP-enabled tools): + - Pivot on high-count rows by passing specific values to ``search_udm`` for + full event context. + - Use ``lookup_entity`` on suspicious IPs or hostnames surfaced by stats. + - Build detection rules targeting aggregation patterns identified here. + - Export raw matching events with ``export_udm_search_csv`` for top offenders. + """ + try: + try: + start_dt, end_dt = parse_time_range(start_time, end_time, hours_back) + except ValueError as e: + logger.error(f"Error parsing date format: {str(e)}", exc_info=True) + return { + "error": f"Error parsing date format: {str(e)}. Use ISO 8601 format (e.g., 2024-01-15T12:00:00Z)", + "columns": [], + "rows": [], + "total_rows": 0, + } + + logger.info( + f"Running stats query - Query: {query}, " + f"Time Range: {start_dt} to {end_dt}, max_values: {max_values}" + ) + + chronicle = get_chronicle_client(project_id, customer_id, region) + + results = chronicle.get_stats( + query=query, + start_time=start_dt, + end_time=end_dt, + max_values=max_values, + timeout=timeout, + max_events=max_events, + case_insensitive=case_insensitive, + ) + + total_rows = results.get("total_rows", 0) + logger.info(f"Stats query returned {total_rows} rows across {len(results.get('columns', []))} columns") + + return results + + except Exception as e: + logger.error(f"Error running stats query: {str(e)}", exc_info=True) + return {"error": str(e), "columns": [], "rows": [], "total_rows": 0} diff --git a/server/secops/tests/test_secops_mcp.py b/server/secops/tests/test_secops_mcp.py index 05ef0fbf..0dce67b3 100644 --- a/server/secops/tests/test_secops_mcp.py +++ b/server/secops/tests/test_secops_mcp.py @@ -36,6 +36,7 @@ from secops_mcp.tools.ioc_matches import get_ioc_matches from secops_mcp.tools.threat_intel import get_threat_intel from secops_mcp.tools.search import search_udm +from secops_mcp.tools.stats import get_stats from secops_mcp.tools.udm_search import ( export_udm_search_csv, find_udm_field_values, @@ -801,6 +802,74 @@ async def test_find_udm_field_values( first_value = result["fieldValues"][0] assert isinstance(first_value, dict) + @pytest.mark.asyncio + async def test_get_stats_basic(self, chronicle_config: Dict[str, str]) -> None: + """Test that get_stats returns a structured aggregation result from Chronicle. + + Runs a stats count by event type over the last 24 hours. We cannot assert + specific row values (data varies per environment), but we verify the response + contract: correct keys, typed values, no error field, and row/total_rows + consistency. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + result = await get_stats( + query="| stats count() as event_count by metadata.event_type", + hours_back=24, + max_values=20, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + assert "error" not in result, f"get_stats returned an error: {result.get('error')}" + assert "columns" in result + assert "rows" in result + assert "total_rows" in result + assert isinstance(result["columns"], list) + assert isinstance(result["rows"], list) + assert isinstance(result["total_rows"], int) + # Row list length must match total_rows — the secops-wrapper contract + assert result["total_rows"] == len(result["rows"]) + + if result["rows"]: + sample_row = result["rows"][0] + assert isinstance(sample_row, dict) + for col in result["columns"]: + assert col in sample_row, f"Column '{col}' missing from row: {sample_row}" + + @pytest.mark.asyncio + async def test_get_stats_count_values_are_numeric( + self, chronicle_config: Dict[str, str] + ) -> None: + """Verify count() values are returned as int/float, not strings. + + The Chronicle API returns int64Val inside a JSON string. The secops-wrapper + must cast it to int. If this casting breaks, downstream consumers (sorting, + thresholding, alerting logic) will silently produce wrong results. + + Args: + chronicle_config: Dictionary with Chronicle configuration + """ + result = await get_stats( + query="| stats count() as total by metadata.event_type", + hours_back=24, + max_values=5, + project_id=chronicle_config["CHRONICLE_PROJECT_ID"], + customer_id=chronicle_config["CHRONICLE_CUSTOMER_ID"], + region=chronicle_config["CHRONICLE_REGION"], + ) + + assert "error" not in result, f"get_stats returned an error: {result.get('error')}" + + for row in result["rows"]: + count_value = row.get("total") + if count_value is not None: + assert isinstance(count_value, (int, float)), ( + f"Expected numeric count, got {type(count_value).__name__}: {count_value!r}" + ) + @pytest.mark.asyncio async def test_service_account_authentication( self, chronicle_config: Dict[str, str] diff --git a/server/secops/tests/test_secops_tools_unit.py b/server/secops/tests/test_secops_tools_unit.py index 8add9f11..60b12f4f 100644 --- a/server/secops/tests/test_secops_tools_unit.py +++ b/server/secops/tests/test_secops_tools_unit.py @@ -45,6 +45,7 @@ def wrapper(func): from secops_mcp.tools.search import search_udm from secops_mcp.tools.udm_search import export_udm_search_csv from secops_mcp.tools.security_events import search_security_events +from secops_mcp.tools.stats import get_stats @pytest.fixture def mock_chronicle_client(): @@ -53,13 +54,22 @@ def mock_chronicle_client(): client.search_udm.return_value = {"total_events": 0, "events": []} client.fetch_udm_search_csv.return_value = {"csv": {"row": []}} client.translate_nl_to_udm.return_value = "metadata.event_type = 'USER_LOGIN'" + client.get_stats.return_value = { + "columns": ["metadata.event_type", "count"], + "rows": [ + {"metadata.event_type": "USER_LOGIN", "count": 150}, + {"metadata.event_type": "NETWORK_CONNECTION", "count": 42}, + ], + "total_rows": 2, + } return client @pytest.fixture def mock_get_client(mock_chronicle_client): with patch('secops_mcp.tools.search.get_chronicle_client', return_value=mock_chronicle_client) as m1, \ patch('secops_mcp.tools.udm_search.get_chronicle_client', return_value=mock_chronicle_client) as m2, \ - patch('secops_mcp.tools.security_events.get_chronicle_client', return_value=mock_chronicle_client) as m3: + patch('secops_mcp.tools.security_events.get_chronicle_client', return_value=mock_chronicle_client) as m3, \ + patch('secops_mcp.tools.stats.get_chronicle_client', return_value=mock_chronicle_client) as m4: yield mock_chronicle_client @pytest.mark.asyncio @@ -230,7 +240,7 @@ async def test_start_after_end(mock_get_client): async def test_export_csv_invalid_date(mock_get_client): """Test that export_udm_search_csv returns error string on invalid date.""" invalid_date = "yesterday" - + result = await export_udm_search_csv( query="test", fields=["test"], @@ -238,7 +248,103 @@ async def test_export_csv_invalid_date(mock_get_client): project_id="test", customer_id="test" ) - + assert isinstance(result, str) assert "Error parsing date format" in result assert "yesterday" in result + + +# --------------------------------------------------------------------------- +# Stats tool tests +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_get_stats_returns_structured_result(mock_get_client): + """Stats results contain typed columns, rows, and total_rows.""" + result = await get_stats( + query="| stats count() as count by metadata.event_type", + hours_back=24, + project_id="test", + customer_id="test", + ) + + assert "error" not in result, f"Unexpected error: {result.get('error')}" + assert result["total_rows"] == 2 + assert result["columns"] == ["metadata.event_type", "count"] + assert len(result["rows"]) == 2 + # Verify row values are typed correctly (int, not string) + login_row = next(r for r in result["rows"] if r["metadata.event_type"] == "USER_LOGIN") + assert login_row["count"] == 150 + + +@pytest.mark.asyncio +async def test_get_stats_passes_correct_time_range(mock_get_client): + """Explicit ISO start/end times are parsed and forwarded as datetime objects.""" + result = await get_stats( + query="| stats count() by metadata.event_type", + start_time="2024-03-01T00:00:00Z", + end_time="2024-03-02T00:00:00Z", + project_id="test", + customer_id="test", + ) + + assert "error" not in result + call_args = mock_get_client.get_stats.call_args + _, kwargs = call_args + assert isinstance(kwargs["start_time"], datetime) + assert kwargs["start_time"].year == 2024 + assert kwargs["start_time"].month == 3 + assert kwargs["start_time"].day == 1 + assert isinstance(kwargs["end_time"], datetime) + assert kwargs["end_time"].day == 2 + + +@pytest.mark.asyncio +async def test_get_stats_passes_max_values_param(mock_get_client): + """max_values is forwarded to the underlying chronicle client.""" + await get_stats( + query="| stats count() by principal.ip", + max_values=100, + project_id="test", + customer_id="test", + ) + + call_kwargs = mock_get_client.get_stats.call_args[1] + assert call_kwargs["max_values"] == 100 + + +@pytest.mark.asyncio +async def test_get_stats_invalid_date_returns_error(mock_get_client): + """Invalid ISO date returns error dict — not an exception.""" + result = await get_stats( + query="| stats count() by metadata.event_type", + start_time="not-a-date", + project_id="test", + customer_id="test", + ) + + assert "error" in result + assert "Error parsing date format" in result["error"] + assert result["columns"] == [] + assert result["rows"] == [] + assert result["total_rows"] == 0 + # Chronicle client must NOT have been called on a bad date + mock_get_client.get_stats.assert_not_called() + + +@pytest.mark.asyncio +async def test_get_stats_api_error_returns_error_dict(mock_get_client): + """A runtime exception from the Chronicle client surfaces as an error dict.""" + mock_get_client.get_stats.side_effect = Exception("Chronicle API unavailable") + + result = await get_stats( + query="| stats count() by metadata.event_type", + project_id="test", + customer_id="test", + ) + + assert "error" in result + assert "Chronicle API unavailable" in result["error"] + assert result["columns"] == [] + assert result["rows"] == [] + assert result["total_rows"] == 0