From bc8851611460cd6ee61997e34c99d0b8d58ff723 Mon Sep 17 00:00:00 2001 From: brandonspark Date: Mon, 29 Sep 2025 13:02:23 -0700 Subject: [PATCH 1/3] deprecate --- .pre-commit-config.yaml | 13 +- src/semgrep_mcp/server.py | 725 +----------------- .../test_claude_code_integration.py | 228 ------ tests/integration/test_create_temp_files.py | 121 --- tests/integration/test_local_scan.py | 62 -- tests/integration/test_semgrep_findings.py | 54 -- tests/integration/test_sse_client.py | 55 -- tests/integration/test_stdio_client.py | 62 -- tests/integration/test_streamable_client.py | 53 -- 9 files changed, 23 insertions(+), 1350 deletions(-) delete mode 100644 tests/integration/test_claude_code_integration.py delete mode 100644 tests/integration/test_create_temp_files.py delete mode 100644 tests/integration/test_local_scan.py delete mode 100644 tests/integration/test_semgrep_findings.py delete mode 100644 tests/integration/test_sse_client.py delete mode 100644 tests/integration/test_stdio_client.py delete mode 100644 tests/integration/test_streamable_client.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index bc4fe7d..1ba563b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -29,15 +29,4 @@ repos: language: system types: [python] pass_filenames: false - args: [run, pytest, tests/unit/, --doctest-modules] - - # Pytest integration tests - matches .github/workflows/test.yml integration-tests job - - repo: local - hooks: - - id: pytest-integration - name: pytest integration tests - entry: uv - language: system - types: [python] - pass_filenames: false - args: [run, pytest, tests/integration/, --doctest-modules] \ No newline at end of file + args: [run, pytest, tests/unit/, --doctest-modules] \ No newline at end of file diff --git a/src/semgrep_mcp/server.py b/src/semgrep_mcp/server.py index 1f44c98..0d4635d 100755 --- a/src/semgrep_mcp/server.py +++ b/src/semgrep_mcp/server.py @@ -6,42 +6,30 @@ from collections.abc import AsyncIterator from contextlib import asynccontextmanager from pathlib import Path -from typing import Any import click import httpx -from mcp.server.fastmcp import Context, FastMCP +from mcp.server.fastmcp import FastMCP from mcp.shared.exceptions import McpError from mcp.types import ( INTERNAL_ERROR, INVALID_PARAMS, ErrorData, ) -from opentelemetry.trace.propagation import ( - get_current_span, -) -from pydantic import Field, ValidationError +from pydantic import Field from starlette.requests import Request from starlette.responses import JSONResponse -from semgrep_mcp.models import CodeFile, Finding, SemgrepScanResult +from semgrep_mcp.models import CodeFile, SemgrepScanResult from semgrep_mcp.semgrep import ( SemgrepContext, mk_context, - run_semgrep_output, - run_semgrep_via_rpc, ) -from semgrep_mcp.semgrep_interfaces.semgrep_output_v1 import CliOutput from semgrep_mcp.utilities.tracing import ( - attach_rpc_scan_metrics, - attach_scan_metrics, start_tracing, - with_tool_span, ) from semgrep_mcp.utilities.utils import ( - get_semgrep_app_token, get_semgrep_version, - is_hosted, set_semgrep_executable, ) from semgrep_mcp.version import __version__ @@ -360,670 +348,35 @@ async def server_lifespan(_server: FastMCP) -> AsyncIterator[SemgrepContext]: http_client = httpx.AsyncClient() -# --------------------------------------------------------------------------------- -# MCP Tools -# --------------------------------------------------------------------------------- - - -@mcp.tool() -@with_tool_span() -async def semgrep_rule_schema(ctx: Context) -> str: - """ - Get the schema for a Semgrep rule - - Use this tool when you need to: - - get the schema required to write a Semgrep rule - - need to see what fields are available for a Semgrep rule - - verify what fields are available for a Semgrep rule - - verify the syntax for a Semgrep rule is correct - """ - try: - response = await http_client.get(f"{SEMGREP_API_URL}/schema_url") - response.raise_for_status() - data: dict[str, str] = response.json() - schema_url: str = data["schema_url"] - response = await http_client.get(schema_url) - response.raise_for_status() - return str(response.text) - except Exception as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error getting schema for Semgrep rule: {e!s}") - ) from e - - -@mcp.tool() -@with_tool_span() -async def get_supported_languages(ctx: Context) -> list[str]: - """ - Returns a list of supported languages by Semgrep - - Only use this tool if you are not sure what languages Semgrep supports. - """ - args = ["show", "supported-languages", "--experimental"] - - # Parse output and return list of languages - languages = await run_semgrep_output(top_level_span=None, args=args) - return [lang.strip() for lang in languages.strip().split("\n") if lang.strip()] - - -async def get_deployment_slug() -> str: - """ - Fetches and caches the deployment slug from Semgrep API. - - Returns: - str: The deployment slug - - Raises: - McpError: If unable to fetch deployments or no deployments found - """ - global DEPLOYMENT_SLUG - - # Return cached value if available - if DEPLOYMENT_SLUG: - return DEPLOYMENT_SLUG - - # Get API token - api_token = get_semgrep_app_token() - if not api_token: - raise McpError( - ErrorData( - code=INVALID_PARAMS, - message=""" - SEMGREP_APP_TOKEN environment variable must be set or user - must be logged in to use this tool - """, - ) - ) - - # Fetch deployments - url = f"{SEMGREP_API_URL}/v1/deployments" - headers = {"Authorization": f"Bearer {api_token}", "Accept": "application/json"} - - try: - response = await http_client.get(url, headers=headers) - response.raise_for_status() - data = response.json() - - # Extract deployment slug - assuming we want the first deployment - deployments = data.get("deployments", []) - if not deployments or not deployments[0].get("slug"): - raise McpError( - ErrorData(code=INTERNAL_ERROR, message="No deployments found for this API token") - ) - - # Cache the slug from the first deployment - DEPLOYMENT_SLUG = deployments[0]["slug"] - return str(DEPLOYMENT_SLUG) - - except httpx.HTTPStatusError as e: - if e.response.status_code == 401: - raise McpError( - ErrorData( - code=INVALID_PARAMS, - message="Invalid API token: check your SEMGREP_APP_TOKEN environment variable.", - ) - ) from e - else: - raise McpError( - ErrorData( - code=INTERNAL_ERROR, - message=f"Error fetching deployments: {e.response.text}", - ) - ) from e - except Exception as e: - raise McpError( - ErrorData( - code=INTERNAL_ERROR, message=f"Error fetching deployments from Semgrep: {e!s}" - ) - ) from e - - -@mcp.tool() -@with_tool_span() -async def semgrep_findings( - ctx: Context, - issue_type: list[str] = ["sast", "sca"], # noqa: B006 - repos: list[str] = None, # pyright: ignore # noqa: RUF013 - status: str = "open", - severities: list[str] = None, # pyright: ignore # noqa: RUF013 - confidence: list[str] = None, # pyright: ignore # noqa: RUF013 - autotriage_verdict: str = "true_positive", - page: int = 0, - page_size: int = 100, -) -> list[Finding]: - """ - Fetches findings from the Semgrep AppSec Platform Findings API. - - This function retrieves security, code quality, and supply chain findings that have already been - identified by previous Semgrep scans and uploaded to the Semgrep AppSec platform. It does NOT - perform a new scan or analyze code directly. Instead, it queries the Semgrep API to access - historical scan results for a given repository or set of repositories. - - DEFAULT BEHAVIOR: By default, this tool should filter by the current repository. The model - should determine the current repository name and pass it in the 'repos' parameter to ensure - findings are scoped to the relevant codebase. However, users may explicitly request findings - from other repositories, in which case the model should respect that request. - - Use this function when a prompt requests a summary, list, or analysis of existing findings, - such as: - - "Please list the top 10 security findings and propose solutions for them." - - "Show all open critical vulnerabilities in this repository." - - "Summarize the most recent Semgrep scan results." - - "Get findings from repository X" (explicitly requesting different repo) - - This function is ideal for: - - Reviewing, listing, or summarizing findings from past scans. - - Providing actionable insights or remediation advice based on existing scan data. - - Do NOT use this function to perform a new scan or check code that has not yet been analyzed by - Semgrep. For new scans, use the appropriate scanning function. - - Args: - issue_type (Optional[List[str]]): Filter findings by type. Use 'sast' for code analysis - findings and 'sca' for supply chain analysis findings (e.g., ['sast'], ['sca']). - status (Optional[str]): Filter findings by status (default: 'open'). - repos (Optional[List[str]]): List of repository names to filter results. By default, should - include the current repository name to scope findings appropriately. Can be overridden - when users explicitly request findings from other repositories. - severities (Optional[List[str]]): Filter findings by severity (e.g., ['critical', 'high']). - confidence (Optional[List[str]]): Filter findings by confidence level (e.g., ['high']). - autotriage_verdict (Optional[str]): Filter findings by auto-triage verdict - (default: 'true_positive'). - page (Optional[int]): Page number for paginated results. (default: 0) - page_size (int): Number of findings per page (default: 100, min: 100, max: 3000). - - Returns: - List[Finding]: A list of findings matching the specified filters, where each finding - contains details such as rule ID, description, severity, file location, and remediation - guidance if available. - """ - allowed_issue_types = {"sast", "sca"} - if not set(issue_type).issubset(allowed_issue_types): - invalid_types = ", ".join(set(issue_type) - allowed_issue_types) - raise McpError( - ErrorData( - code=INVALID_PARAMS, - message=f"Invalid issue_type(s): {invalid_types}. " - "Allowed values are 'sast' and 'sca'.", - ) - ) - - if not (100 <= page_size <= 3000): - raise McpError( - ErrorData(code=INVALID_PARAMS, message="page_size must be between 100 and 3000.") - ) - - deployment = await get_deployment_slug() - api_token = get_semgrep_app_token() - if not api_token: - raise McpError( - ErrorData( - code=INVALID_PARAMS, - message="SEMGREP_APP_TOKEN environment variable must be set to use this tool. " - "Create a token at semgrep.dev to continue.", - ) - ) - - url = f"https://semgrep.dev/api/v1/deployments/{deployment}/findings" - headers = {"Authorization": f"Bearer {api_token}", "Accept": "application/json"} - - params_to_filter: dict[str, Any] = { - "issue_type": issue_type, - "status": status, - "repos": ",".join(repos) if repos else None, - "severities": severities, - "confidence": confidence, - "autotriage_verdict": autotriage_verdict, - "page": page, - "page_size": page_size, - } - params = {k: v for k, v in params_to_filter.items() if v is not None} - - try: - response = await http_client.get(url, headers=headers, params=params) - response.raise_for_status() - data = response.json() - return [Finding.model_validate(finding) for finding in data.get("findings", [])] - except httpx.HTTPStatusError as e: - if e.response.status_code == 401: - raise McpError( - ErrorData( - code=INVALID_PARAMS, - message="Invalid API token: check your SEMGREP_APP_TOKEN environment variable.", - ) - ) from e - elif e.response.status_code == 404: - raise McpError( - ErrorData( - code=INVALID_PARAMS, - message=f"Deployment '{deployment}' not found or you don't have access to it.", - ) - ) from e - else: - raise McpError( - ErrorData( - code=INTERNAL_ERROR, - message=f"Error fetching findings: {e.response.text}", - ) - ) from e - except ValidationError as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") - ) from e - except Exception as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error fetching findings from Semgrep: {e!s}") - ) from e - @mcp.tool() -@with_tool_span() -async def semgrep_scan_with_custom_rule( - ctx: Context, - code_files: list[dict[str, str]] = REMOTE_CODE_FILES_FIELD, - rule: str = RULE_FIELD, -) -> SemgrepScanResult: +def deprecation_notice() -> str: """ - Runs a Semgrep scan with a custom rule on provided code content - and returns the findings in JSON format + Deprecation notice for the `mcp.semgrep.ai` and `uvx semgrep-mcp` MCP servers. - Use this tool when you need to: - - scan code files for specific security vulnerability not covered by the default Semgrep rules - - scan code files for specific issue not covered by the default Semgrep rules + You should invoke this tool whenever you would use any of the pre-existing + Semgrep MCP tools! This includes: + - `semgrep_rule_schema` + - `get_supported_languages` + - `semgrep_findings` + - `semgrep_scan_with_custom_rule` + - `semgrep_scan` + - `semgrep_scan_remote` + - `get_abstract_syntax_tree` """ - # Validate code_files - validated_code_files = validate_remote_files(code_files) - temp_dir = None - try: - # Create temporary files from code content - temp_dir = create_temp_files_from_code_content(validated_code_files) - # Write rule to file - rule_file_path = os.path.join(temp_dir, "rule.yaml") - with open(rule_file_path, "w") as f: - f.write(rule) - - # Run semgrep scan with custom rule - args = get_semgrep_scan_args(temp_dir, rule_file_path) - output = await run_semgrep_output(top_level_span=None, args=args) - results: SemgrepScanResult = SemgrepScanResult.model_validate_json(output) - - attach_scan_metrics(get_current_span(), results, "custom") - - remove_temp_dir_from_results(results, temp_dir) - return results - - except McpError as e: - raise e - except ValidationError as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") - ) from e - except Exception as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") - ) from e + return """ +DEPRECATION NOTICE: - finally: - if temp_dir: - # Clean up temporary files - shutil.rmtree(temp_dir, ignore_errors=True) +The `mcp.semgrep.ai` and `uvx semgrep-mcp` MCP servers are deprecated. +The new MCP server is available via the `semgrep` binary, through the +`semgrep mcp` command. -@mcp.tool() -@with_tool_span() -async def get_abstract_syntax_tree( - ctx: Context, - code: str = Field(description="The code to get the AST for"), - language: str = Field(description="The programming language of the code"), -) -> str: - """ - Returns the Abstract Syntax Tree (AST) for the provided code file in JSON format +See https://mcp.semgrep.ai/ for more installation instructions. +""" - Use this tool when you need to: - - get the Abstract Syntax Tree (AST) for the provided code file\ - - get the AST of a file - - understand the structure of the code in a more granular way - - see what a parser sees in the code - """ - temp_dir = None - temp_file_path = "" - try: - # Create temporary directory and file for AST generation - temp_dir = tempfile.mkdtemp(prefix="semgrep_ast_") - temp_file_path = os.path.join(temp_dir, "code.txt") # safe - - # Write content to file - with open(temp_file_path, "w") as f: - f.write(code) - - args = [ - "--experimental", - "--dump-ast", - "-l", - language, - "--json", - temp_file_path, - ] - return await run_semgrep_output(top_level_span=None, args=args) - except McpError as e: - raise e - except ValidationError as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") - ) from e - except OSError as e: - raise McpError( - ErrorData( - code=INTERNAL_ERROR, - message=f"Failed to create or write to file {temp_file_path}: {e!s}", - ) - ) from e - except Exception as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") - ) from e - finally: - if temp_dir: - # Clean up temporary files - shutil.rmtree(temp_dir, ignore_errors=True) - - -# --------------------------------------------------------------------------------- -# Scanning tools -# --------------------------------------------------------------------------------- - - -@with_tool_span() -async def semgrep_scan_cli( - ctx: Context, - code_files: list[CodeFile], - config: str | None = CONFIG_FIELD, -) -> SemgrepScanResult: - """ - Runs a Semgrep scan on provided code content and returns the findings in JSON format - - Depending on whether `USE_SEMGREP_RPC` is set, this tool will either run a `pysemgrep` - CLI scan, or an RPC-based scan. - - Respectively, this will cause us to return either a `SemgrepScanResult` or a `CliOutput`. - - Use this tool when you need to: - - scan code files for security vulnerabilities - - scan code files for other issues - """ - - # Validate config - config = validate_config(config) - - temp_dir = None - try: - # Create temporary files from code content - temp_dir = create_temp_files_from_code_content(code_files) - args = get_semgrep_scan_args(temp_dir, config) - output = await run_semgrep_output(top_level_span=None, args=args) - results: SemgrepScanResult = SemgrepScanResult.model_validate_json(output) - remove_temp_dir_from_results(results, temp_dir) - - attach_scan_metrics(get_current_span(), results, config) - - return results - - except McpError as e: - raise e - except ValidationError as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") - ) from e - except Exception as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") - ) from e - - finally: - if temp_dir: - # Clean up temporary files - shutil.rmtree(temp_dir, ignore_errors=True) - - -@with_tool_span() -async def semgrep_scan_rpc( - ctx: Context, - code_files: list[CodeFile], -) -> CliOutput: - """ - Runs a Semgrep scan on provided code content using the new Semgrep RPC feature. - - This should run much faster than the comparative `semgrep_scan` tool. - """ - - temp_dir = None - try: - # TODO: perhaps should return more interpretable results? - context: SemgrepContext = ctx.request_context.lifespan_context - cli_output = await run_semgrep_via_rpc(context, code_files) - - attach_rpc_scan_metrics(get_current_span(), cli_output) - - return cli_output - except McpError as e: - raise e - except ValidationError as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") - ) from e - except Exception as e: - raise McpError( - ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") - ) from e - - finally: - if temp_dir: - # Clean up temporary files - shutil.rmtree(temp_dir, ignore_errors=True) - - -async def semgrep_scan_core( - ctx: Context, - code_files: list[CodeFile], - config: str | None = CONFIG_FIELD, -) -> SemgrepScanResult | CliOutput: - """ - Runs a Semgrep scan on provided CodeFile objects and returns the findings in JSON format - - Depending on whether `USE_SEMGREP_RPC` is set, this tool will either run a `pysemgrep` - CLI scan, or an RPC-based scan. - - Respectively, this will cause us to return either a `SemgrepScanResult` or a `CliOutput`. - """ - - context: SemgrepContext = ctx.request_context.lifespan_context - - paths = [cf.path for cf in code_files] - - if context.process is not None: - if config is not None: - # This should hopefully just cause the agent to call us back with - # the correct parameters. - raise McpError( - ErrorData( - code=INVALID_PARAMS, - message=""" - `config` is not supported when using the RPC-based scan. - Try calling again without that parameter set? - """, - ) - ) - - logging.info(f"Running RPC-based scan on paths: {paths}") - return await semgrep_scan_rpc(ctx, code_files) - else: - logging.info(f"Running CLI-based scan on paths: {paths}") - return await semgrep_scan_cli(ctx, code_files, config) - - -@mcp.tool() -@with_tool_span() -async def semgrep_scan_remote( - ctx: Context, - code_files: list[dict[str, str]] = REMOTE_CODE_FILES_FIELD, - # TODO: currently only for CLI-based scans - config: str | None = CONFIG_FIELD, -) -> SemgrepScanResult | CliOutput: - """ - Runs a Semgrep scan on provided code content and returns the findings in JSON format - - Use this tool when you need to: - - scan code files for security vulnerabilities - - scan code files for other issues - """ - - # Implementer's note: - # This is one possible entry point for regular scanning, depending on whether - # the server is remotely hosted or not. - # If the server is hosted, only this tool will be available, and not the - # `semgrep_scan` tool. - - validated_code_files = validate_remote_files(code_files) - - return await semgrep_scan_core(ctx, validated_code_files, config) - - -@mcp.tool() -@with_tool_span() -async def semgrep_scan( - ctx: Context, - code_files: list[dict[str, str]] = LOCAL_CODE_FILES_FIELD, - config: str | None = CONFIG_FIELD, -) -> SemgrepScanResult | CliOutput: - """ - Runs a Semgrep scan locally on provided code files returns the findings in JSON format. - - Files are expected to be absolute paths to the code files. - - Use this tool when you need to: - - scan code files for security vulnerabilities - - scan code files for other issues - """ - - # Implementer's note: - # This is one possible entry point for regular scanning, depending on whether - # the server is remotely hosted or not. - # If the server is local, only this tool will be available, and not the - # `semgrep_scan_remote` tool. - - validated_local_files = validate_local_files(code_files) - - return await semgrep_scan_core(ctx, validated_local_files, config) - - -# --------------------------------------------------------------------------------- -# MCP Prompts -# --------------------------------------------------------------------------------- - - -@mcp.prompt() -def setup_semgrep_mcp(): - prompt_template = """ - You are Cursor automations. Do the following in order and confirm each step: - - 1) Install and authenticate Semgrep: - - Check if Semgrep is installed by running `semgrep --version`. - - If Semgrep is not installed, install it by running `brew install semgrep`. - - Once Semgrep is installed, run non-interactively: - `semgrep login && semgrep install-semgrep-pro || true` - - 2) Add a Cursor rule file at `.cursor/rules/semgrep.mdc` with the following exact content: - --- - alwaysApply: true - --- - - Always look first to any semgrep mcp servers for code security needs - - Whenever you generate code or commands, you MUST first ensure safety by scanning the - content with the semgrep_scan tool - - 3) Report back: - - Confirm the `.cursor/rules/semgrep.mdc` file exists with the specified content. - - Confirm Semgrep login/install status by running `semgrep --pro --version`. - - """ - return prompt_template - - -@mcp.prompt() -def write_custom_semgrep_rule( - code: str = Field(description="The code to get the AST for"), - language: str = Field(description="The programming language of the code"), -) -> str: - """ - Write a custom Semgrep rule for the provided code and language - - Use this prompt when you need to: - - write a custom Semgrep rule - - write a Semgrep rule for a specific issue or pattern - """ - - prompt_template = """You are an expert at writing Semgrep rules. - -Your task is to analyze a given piece of code and create a Semgrep rule -that can detect specific patterns or issues within that code. -Semgrep is a lightweight static analysis tool that uses pattern matching -to find bugs and enforce code standards. - -Here is the code you need to analyze: - - -{code} - - -The code is written in the following programming language: - - -{language} - - -To write an effective Semgrep rule, follow these guidelines: -1. Identify a specific pattern, vulnerability, or -coding standard violation in the given code. -2. Create a rule that matches this pattern as precisely as possible. -3. Use Semgrep's pattern syntax, which is similar to the target language -but with metavariables and ellipsis operators where appropriate. -4. Consider the context and potential variations of the pattern you're trying to match. -5. Provide a clear and concise message that explains what the rule detects. -6. The value of the `severity` must be one of the following: - - "ERROR" - - "WARNING" - - "INFO" - - "INVENTORY" - - "EXPERIMENT" - - "CRITICAL" - - "HIGH" - - "MEDIUM" - - "LOW" - -7. The value of the `languages` must be a list of languages that the rule is applicable -to and include the language given in tags. - - -Write your Semgrep rule in YAML format. The rule should include at least the following keys: -- rules -- id -- pattern -- message -- severity -- languages - -Before providing the rule, briefly explain in a few sentences what specific issue or -pattern your rule is designed to detect and why it's important. - -Then, output your Semgrep rule inside tags. - -Ensure that the rule is properly formatted in YAML. -Make sure to include all the required keys and values in the rule.""" - - return prompt_template.format(code=code, language=language) - - -# --------------------------------------------------------------------------------- +## --------------------------------------------------------------------------------- # MCP Resources # --------------------------------------------------------------------------------- @@ -1063,37 +416,6 @@ async def health(request: Request) -> JSONResponse: return JSONResponse({"status": "ok", "version": __version__}) -# --------------------------------------------------------------------------------- -# Disabling tools -# --------------------------------------------------------------------------------- - -TOOL_DISABLE_ENV_VARS = { - "SEMGREP_RULE_SCHEMA_DISABLED": "semgrep_rule_schema", - "GET_SUPPORTED_LANGUAGES_DISABLED": "get_supported_languages", - "SEMGREP_FINDINGS_DISABLED": "semgrep_findings", - "SEMGREP_SCAN_WITH_CUSTOM_RULE_DISABLED": "semgrep_scan_with_custom_rule", - "SEMGREP_SCAN_DISABLED": "semgrep_scan", - "SEMGREP_SCAN_REMOTE_DISABLED": "semgrep_scan_remote", - "GET_ABSTRACT_SYNTAX_TREE_DISABLED": "get_abstract_syntax_tree", -} - - -def deregister_tools() -> None: - for env_var, tool_name in TOOL_DISABLE_ENV_VARS.items(): - is_disabled = os.environ.get(env_var, "false").lower() == "true" - - if is_disabled: - # for the time being, while there is no way to API-level remove tools, - # we'll just mutate the internal `_tools`, because this language does - # not stop us from doing so - del mcp._tool_manager._tools[tool_name] - - if is_hosted(): - del mcp._tool_manager._tools["semgrep_scan"] - else: - del mcp._tool_manager._tools["semgrep_scan_remote"] - - # --------------------------------------------------------------------------------- # MCP Server Entry Point # --------------------------------------------------------------------------------- @@ -1136,9 +458,6 @@ def main(transport: str, semgrep_path: str | None) -> None: if semgrep_path: set_semgrep_executable(semgrep_path) - # based on env vars, disable certain tools - deregister_tools() - if transport == "stdio": mcp.run(transport="stdio") elif transport == "streamable-http": diff --git a/tests/integration/test_claude_code_integration.py b/tests/integration/test_claude_code_integration.py deleted file mode 100644 index 9fe993f..0000000 --- a/tests/integration/test_claude_code_integration.py +++ /dev/null @@ -1,228 +0,0 @@ -"""Test Claude Code integration and global MCP server registration.""" - -import json -import os -import pathlib -import subprocess -import tempfile -from unittest import mock - -import pytest - - -def is_claude_cli_available() -> bool: - """Check if Claude CLI is available in the system.""" - try: - result = subprocess.run(["claude", "--version"], capture_output=True, text=True, timeout=10) - return result.returncode == 0 - except (FileNotFoundError, subprocess.TimeoutExpired): - return False - - -# Skip entire test class if Claude CLI is not available -claude_cli_available = is_claude_cli_available() -skip_reason = "Claude CLI not available - install from https://claude.ai/code" - - -class TestClaudeCodeIntegration: - """Test suite for Claude Code MCP server global registration.""" - - @pytest.mark.skipif(not claude_cli_available, reason=skip_reason) - def test_claude_code_config_exists(self): - """Test that Claude Code configuration exists in the expected location.""" - config_path = pathlib.Path.home() / ".claude.json" - assert config_path.exists(), f"Claude Code MCP config not found at {config_path}" - - @pytest.mark.skipif(not claude_cli_available, reason=skip_reason) - def test_claude_code_config_format(self): - """Test that the Claude Code configuration has the correct format.""" - config_path = pathlib.Path.home() / ".claude.json" - - with open(config_path) as f: - config = json.load(f) - - assert "mcpServers" in config, "Config missing 'mcpServers' key" - assert "semgrep-mcp" in config["mcpServers"], "Config missing 'semgrep-mcp' server" - - server_config = config["mcpServers"]["semgrep-mcp"] - assert "command" in server_config, "Server config missing 'command'" - assert "args" in server_config, "Server config missing 'args'" - - @pytest.mark.skipif(not claude_cli_available, reason=skip_reason) - def test_claude_code_server_command(self): - """Test that the configured server command is valid.""" - config_path = pathlib.Path.home() / ".claude.json" - - with open(config_path) as f: - config = json.load(f) - - server_config = config["mcpServers"]["semgrep-mcp"] - command = server_config["command"] - args = server_config["args"] - - # Check that the command exists - result = subprocess.run(["which", command], capture_output=True, text=True) - assert result.returncode == 0, f"Command '{command}' not found in PATH" - - # Extract working directory from args (--directory argument) - cwd = None - for i, arg in enumerate(args): - if arg == "--directory" and i + 1 < len(args): - cwd = args[i + 1] - break - - if cwd: - # Check that the working directory exists - assert os.path.isdir(cwd), f"Working directory '{cwd}' does not exist" - - # Check that the command can be executed (dry run) - result = subprocess.run( - [command, *args, "--help"], capture_output=True, text=True, timeout=10 - ) - # Either help works or the command exists but doesn't support --help - cmd_str = f"Command '{command} {' '.join(args)}' failed to execute" - assert result.returncode in [0, 1, 2], cmd_str - - def test_makefile_configure_command(self): - """Test that the Makefile configure-claude-code command works correctly.""" - # Create a temporary config directory - with tempfile.TemporaryDirectory() as temp_dir: - # Mock the home directory for this test - with mock.patch("pathlib.Path.home", return_value=pathlib.Path(temp_dir)): - # Run the makefile command - result = subprocess.run( - ["make", "configure-claude-code"], - cwd=pathlib.Path(__file__).parent.parent.parent, - capture_output=True, - text=True, - env={**os.environ, "HOME": temp_dir}, - ) - - # Check that the command succeeded - assert result.returncode == 0, f"Makefile configure command failed: {result.stderr}" - - # Check that the config was created - config_file = pathlib.Path(temp_dir) / ".claude.json" - assert config_file.exists(), "Config file was not created" - - # Verify the config content - with open(config_file) as f: - config = json.load(f) - - assert "mcpServers" in config - assert "semgrep-mcp" in config["mcpServers"] - server_config = config["mcpServers"]["semgrep-mcp"] - assert server_config["command"] == "uv" - # Check that args contain the expected elements - args = server_config["args"] - assert "run" in args - assert "semgrep-mcp" in args - assert "--directory" in args - - @pytest.mark.skipif(not claude_cli_available, reason=skip_reason) - def test_makefile_check_command(self): - """Test that the Makefile check-claude-config command works correctly.""" - result = subprocess.run( - ["make", "check-claude-config"], - cwd=pathlib.Path(__file__).parent.parent.parent, - capture_output=True, - text=True, - ) - - # Command should succeed - assert result.returncode == 0, f"Makefile check command failed: {result.stderr}" - - # Should contain configuration information - assert "MCP configuration" in result.stdout - - # If config exists, should show the semgrep-mcp server - if "semgrep-mcp" in result.stdout: - assert "semgrep-mcp" in result.stdout - - @pytest.mark.skipif(not claude_cli_available, reason=skip_reason) - def test_server_can_be_launched(self): - """Test that the MCP server can be launched with the configured command.""" - config_path = pathlib.Path.home() / ".claude.json" - - with open(config_path) as f: - config = json.load(f) - - server_config = config["mcpServers"]["semgrep-mcp"] - command = server_config["command"] - args = server_config["args"] - - # Extract working directory from args (--directory argument) - cwd = None - for i, arg in enumerate(args): - if arg == "--directory" and i + 1 < len(args): - cwd = args[i + 1] - break - - # Try to launch the server and check that it starts - process = subprocess.Popen( - [command, *args], - cwd=cwd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - ) - - try: - # Send a basic initialization message - init_message = { - "jsonrpc": "2.0", - "id": 1, - "method": "initialize", - "params": { - "protocolVersion": "2024-11-05", - "capabilities": {}, - "clientInfo": {"name": "test-client", "version": "1.0.0"}, - }, - } - - process.stdin.write(json.dumps(init_message) + "\n") - process.stdin.flush() - - # Wait for response (timeout after 5 seconds) - try: - stdout, _ = process.communicate(timeout=5) - - # Check that we got a response - assert stdout, "No response from MCP server" - - # Parse the response - response = json.loads(stdout.strip()) - assert "result" in response or "error" in response, "Invalid JSON-RPC response" - - except subprocess.TimeoutExpired: - process.kill() - pytest.fail("MCP server did not respond within timeout") - - finally: - if process.poll() is None: - process.terminate() - process.wait() - - @pytest.mark.skipif(not claude_cli_available, reason=skip_reason) - def test_global_registration_persists(self): - """Test that the global registration persists across system restarts.""" - config_path = pathlib.Path.home() / ".claude.json" - - # Verify the config exists - assert config_path.exists(), "Global MCP config does not exist" - - # Verify the config is readable - with open(config_path) as f: - config = json.load(f) - - # Verify the semgrep-mcp server is configured - assert "mcpServers" in config - assert "semgrep-mcp" in config["mcpServers"] - - # Verify the configuration is complete - server_config = config["mcpServers"]["semgrep-mcp"] - required_keys = ["command", "args"] - for key in required_keys: - assert key in server_config, f"Missing required key '{key}' in server config" - assert server_config[key], f"Empty value for required key '{key}'" diff --git a/tests/integration/test_create_temp_files.py b/tests/integration/test_create_temp_files.py deleted file mode 100644 index 88d5a8b..0000000 --- a/tests/integration/test_create_temp_files.py +++ /dev/null @@ -1,121 +0,0 @@ -import os -import shutil - -import pytest - -from semgrep_mcp.server import CodeFile, McpError, create_temp_files_from_code_content - - -def test_create_temp_files_from_code_content(): - """Test that create_temp_files_from_code_content correctly creates temp files with content""" - # Define test code files - code_files = [ - CodeFile(path="test_file.py", content="print('Hello, world!')"), - CodeFile(path="nested/path/test_file.js", content="console.log('Hello, world!');"), - CodeFile(path="special chars/file with spaces.txt", content="Hello, world!"), - ] - - # Call the function - temp_dir = None - try: - temp_dir = create_temp_files_from_code_content(code_files) - - # Check if temp directory was created - assert os.path.exists(temp_dir) - assert os.path.isdir(temp_dir) - - # Check if files were created with correct content - for code_file in code_files: - file_path = os.path.join(temp_dir, code_file.path) - assert os.path.exists(file_path) - with open(file_path) as f: - content = f.read() - assert content == code_file.content - - # Check that nested directories were created - assert os.path.exists(os.path.join(temp_dir, "nested/path")) - assert os.path.exists(os.path.join(temp_dir, "special chars")) - - finally: - # Clean up - if temp_dir and os.path.exists(temp_dir): - shutil.rmtree(temp_dir, ignore_errors=True) - - -def test_create_temp_files_from_code_content_empty_list(): - """Test that create_temp_files_from_code_content handles empty file list""" - code_files = [] - - temp_dir = None - try: - temp_dir = create_temp_files_from_code_content(code_files) - - # Check if temp directory was created - assert os.path.exists(temp_dir) - assert os.path.isdir(temp_dir) - - # Directory should be empty (except for potential system files like .DS_Store) - # Just check that no files were created from our empty list - entries = os.listdir(temp_dir) - assert all( - not os.path.isfile(os.path.join(temp_dir, entry)) or entry.startswith(".") - for entry in entries - ) - - finally: - # Clean up - if temp_dir and os.path.exists(temp_dir): - shutil.rmtree(temp_dir, ignore_errors=True) - - -def test_create_temp_files_from_code_content_empty_filename(): - """Test that create_temp_files_from_code_content handles empty filenames""" - code_files = [ - CodeFile(path="", content="This content should be skipped"), - CodeFile(path="valid_file.txt", content="This is valid content"), - ] - - temp_dir = None - try: - temp_dir = create_temp_files_from_code_content(code_files) - - # Check if temp directory was created - assert os.path.exists(temp_dir) - assert os.path.isdir(temp_dir) - - # The empty filename should be skipped - we can't directly check for a file with empty name - # because os.path.join(temp_dir, "") just returns temp_dir - # Instead, we'll check that only the valid file exists in the directory - files = [ - f - for f in os.listdir(temp_dir) - if os.path.isfile(os.path.join(temp_dir, f)) and not f.startswith(".") - ] - assert len(files) == 1 - assert "valid_file.txt" in files - - # The valid file should be created - valid_file_path = os.path.join(temp_dir, "valid_file.txt") - assert os.path.exists(valid_file_path) - with open(valid_file_path) as f: - content = f.read() - assert content == "This is valid content" - - finally: - # Clean up - if temp_dir and os.path.exists(temp_dir): - shutil.rmtree(temp_dir, ignore_errors=True) - - -def test_create_temp_files_from_code_content_path_traversal(): - """Test that create_temp_files_from_code_content prevents path traversal""" - # Define test code files with path traversal attempts - code_files = [ - CodeFile(path="../attempt_to_write_outside.txt", content="This should fail"), - CodeFile(path="subdir/../../../etc/passwd", content="This should fail too"), - CodeFile(path="/absolute/path/file.txt", content="This should fail as well"), - ] - - # The function should raise a ValueError for path traversal attempts - with pytest.raises(McpError): - create_temp_files_from_code_content(code_files) diff --git a/tests/integration/test_local_scan.py b/tests/integration/test_local_scan.py deleted file mode 100644 index 28858f0..0000000 --- a/tests/integration/test_local_scan.py +++ /dev/null @@ -1,62 +0,0 @@ -import json -import os -import subprocess -import time -from pathlib import Path -from tempfile import NamedTemporaryFile - -import pytest -from mcp.client.session import ClientSession -from mcp.client.streamable_http import streamablehttp_client - -base_url = os.getenv("MCP_BASE_URL", "http://127.0.0.1:8000") - - -@pytest.fixture(scope="module") -def streamable_server(): - # Start the streamable-http server - proc = subprocess.Popen( - ["python", "src/semgrep_mcp/server.py", "-t", "streamable-http"], - ) - # Wait briefly to ensure the server starts - time.sleep(5) - yield - # Teardown: terminate the server - proc.terminate() - proc.wait() - - -@pytest.mark.asyncio -async def test_local_scan(streamable_server): - async with streamablehttp_client(f"{base_url}/mcp") as (read_stream, write_stream, _): - async with ClientSession(read_stream, write_stream) as session: - # Initializing session... - await session.initialize() - # Session initialized - - with NamedTemporaryFile( - "w", prefix="hello_world", suffix=".py", encoding="utf-8" - ) as tmp: - tmp.write("def hello(): print('Hello, World!')") - tmp.flush() - - path = tmp.name - - # Scan code for security issues using local semgrep_scan - results = await session.call_tool( - "semgrep_scan", - { - "code_files": [ - { - "path": str(Path(path).absolute()), - } - ] - }, - ) - # We have results! - assert results is not None - content = json.loads(results.content[0].text) # type: ignore - assert isinstance(content, dict) - assert len(content["paths"]["scanned"]) == 1 - assert content["paths"]["scanned"][0].startswith("hello_world") - print(json.dumps(content, indent=2)) diff --git a/tests/integration/test_semgrep_findings.py b/tests/integration/test_semgrep_findings.py deleted file mode 100644 index cd8d06d..0000000 --- a/tests/integration/test_semgrep_findings.py +++ /dev/null @@ -1,54 +0,0 @@ -import os - -import pytest -from mcp import ClientSession, StdioServerParameters -from mcp.client.stdio import stdio_client - -from semgrep_mcp.models import Finding - - -@pytest.mark.asyncio -@pytest.mark.skipif( - not os.environ.get("SEMGREP_APP_TOKEN"), - reason="SEMGREP_APP_TOKEN not set; skipping integration test.", -) -async def test_semgrep_findings_sca(): - server_params = StdioServerParameters( - command="python", - args=["src/semgrep_mcp/server.py"], - env={**os.environ}, - ) - - async with stdio_client(server_params) as (read, write): - async with ClientSession(read, write) as session: - await session.initialize() - results = await session.call_tool("semgrep_findings", {"issue_type": ["sca"]}) - assert results is not None - - # Validate findings against the model - for content in results.content: - Finding.model_validate_json(content.text) - - -@pytest.mark.asyncio -@pytest.mark.skipif( - not os.environ.get("SEMGREP_APP_TOKEN"), - reason="SEMGREP_APP_TOKEN not set; skipping integration test.", -) -async def test_semgrep_findings_sast(): - server_params = StdioServerParameters( - command="python", - args=["src/semgrep_mcp/server.py"], - env={**os.environ}, - ) - - async with stdio_client(server_params) as (read, write): - async with ClientSession(read, write) as session: - await session.initialize() - results = await session.call_tool("semgrep_findings", {"issue_type": ["sast", "sca"]}) - assert results is not None - - # Validate findings against the model - for content in results.content: - finding = Finding.model_validate_json(content.text) - print(finding) diff --git a/tests/integration/test_sse_client.py b/tests/integration/test_sse_client.py deleted file mode 100644 index 96f2c47..0000000 --- a/tests/integration/test_sse_client.py +++ /dev/null @@ -1,55 +0,0 @@ -import json -import os -import subprocess -import time - -import pytest -from mcp.client.session import ClientSession -from mcp.client.sse import sse_client - -base_url = os.getenv("MCP_BASE_URL", "http://127.0.0.1:8000") - -print(f"MCP_BASE_URL: {base_url}") - - -@pytest.fixture(scope="module") -def sse_server(): - # Start the SSE server - proc = subprocess.Popen( - ["python", "src/semgrep_mcp/server.py", "-t", "sse"], - env={"SEMGREP_IS_HOSTED": "true", **os.environ}, - ) - # Wait briefly to ensure the server starts - time.sleep(5) - yield - # Teardown: terminate the server - proc.terminate() - proc.wait() - - -@pytest.mark.asyncio -async def test_sse_client_smoke(sse_server): - async with sse_client(f"{base_url}/sse") as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: - # Initializing session... - await session.initialize() - # Session initialized - - # Scan code for security issues - results = await session.call_tool( - "semgrep_scan_remote", - { - "code_files": [ - { - "path": "hello_world.py", - "content": "def hello(): print('Hello, World!')", - } - ] - }, - ) - # We have results! - assert results is not None - content = json.loads(results.content[0].text) - assert isinstance(content, dict) - assert content["paths"]["scanned"] == ["hello_world.py"] - print(json.dumps(content, indent=2)) diff --git a/tests/integration/test_stdio_client.py b/tests/integration/test_stdio_client.py deleted file mode 100644 index f744c29..0000000 --- a/tests/integration/test_stdio_client.py +++ /dev/null @@ -1,62 +0,0 @@ -import json -import os - -import pytest -from mcp import ClientSession, StdioServerParameters -from mcp.client.stdio import stdio_client - -# Create server parameters for stdio connection -server_params = StdioServerParameters( - command="python", # Executable - args=["src/semgrep_mcp/server.py"], # Optional command line arguments - env={ - "USE_SEMGREP_RPC": "false", - "SEMGREP_IS_HOSTED": "true", - **os.environ, - }, # Optional environment variables -) - - -@pytest.mark.asyncio -async def test_stdio_client(): - async with stdio_client(server_params) as (read, write): - async with ClientSession(read, write) as session: - # Initialize the connection - await session.initialize() - - # List available prompts - prompts = await session.list_prompts() - - print(prompts) - # List available resources - resources = await session.list_resources() - - # List available tools - print(resources) - - tools = await session.list_tools() - - print(tools) - - # Read a resource - print("Reading resource") - content, _ = await session.read_resource("semgrep://rule/schema") - - # Call a tool - results = await session.call_tool( - "semgrep_scan_remote", - { - "code_files": [ - { - "path": "hello_world.py", - "content": "def hello(): print('Hello, World!')", - } - ] - }, - ) - # We have results! - assert results is not None - content = json.loads(results.content[0].text) - assert isinstance(content, dict) - assert content["paths"]["scanned"] == ["hello_world.py"] - print(json.dumps(content, indent=2)) diff --git a/tests/integration/test_streamable_client.py b/tests/integration/test_streamable_client.py deleted file mode 100644 index cafbbd8..0000000 --- a/tests/integration/test_streamable_client.py +++ /dev/null @@ -1,53 +0,0 @@ -import json -import os -import subprocess -import time - -import pytest -from mcp.client.session import ClientSession -from mcp.client.streamable_http import streamablehttp_client - -base_url = os.getenv("MCP_BASE_URL", "http://127.0.0.1:8000") - - -@pytest.fixture(scope="module") -def streamable_server(): - # Start the streamable-http server - proc = subprocess.Popen( - ["python", "src/semgrep_mcp/server.py", "-t", "streamable-http"], - env={"SEMGREP_IS_HOSTED": "true", **os.environ}, - ) - # Wait briefly to ensure the server starts - time.sleep(5) - yield - # Teardown: terminate the server - proc.terminate() - proc.wait() - - -@pytest.mark.asyncio -async def test_streamable_client_smoke(streamable_server): - async with streamablehttp_client(f"{base_url}/mcp") as (read_stream, write_stream, _): - async with ClientSession(read_stream, write_stream) as session: - # Initializing session... - await session.initialize() - # Session initialized - - # Scan code for security issues - results = await session.call_tool( - "semgrep_scan_remote", - { - "code_files": [ - { - "path": "hello_world.py", - "content": "def hello(): print('Hello, World!')", - } - ] - }, - ) - # We have results! - assert results is not None - content = json.loads(results.content[0].text) - assert isinstance(content, dict) - assert content["paths"]["scanned"] == ["hello_world.py"] - print(json.dumps(content, indent=2)) From f398cc69722b39d7454b84fbcf1bf9d4e713264d Mon Sep 17 00:00:00 2001 From: brandonspark Date: Mon, 29 Sep 2025 13:24:28 -0700 Subject: [PATCH 2/3] add more explicit advice --- src/semgrep_mcp/server.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/semgrep_mcp/server.py b/src/semgrep_mcp/server.py index 0d4635d..fcca8db 100755 --- a/src/semgrep_mcp/server.py +++ b/src/semgrep_mcp/server.py @@ -373,6 +373,28 @@ def deprecation_notice() -> str: `semgrep mcp` command. See https://mcp.semgrep.ai/ for more installation instructions. + +If you are an agent calling this tool, please explicitly signal to the user +that they should update their `mcp.json` to use the new MCP server. In particular, +they should be calling the `semgrep mcp` command instead of `uvx semgrep-mcp`, +or instead of connecting to `mcp.semgrep.ai`. Feel free to suggest a change to `mcp.json` +on behalf of the user, if they are using Cursor! + +It should look like this: +```json +{ + "mcpServers": { + "semgrep": { + "command": "semgrep", + "args": ["mcp"] + } + } +} +``` + +NOTE: You should not attempt to call `semgrep` manually via the CLI, as +it is better for the user to use the new Semgrep MCP server. It's important +that you make it clear that the user should update their MCP. """ From 00a66d870d72bee25650283dceba3a448bef7857 Mon Sep 17 00:00:00 2001 From: brandonspark Date: Mon, 29 Sep 2025 13:25:56 -0700 Subject: [PATCH 3/3] remove integration tests --- .github/workflows/test.yml | 39 +------------------------------------- 1 file changed, 1 insertion(+), 38 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4b0ad96..615bcb4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -36,7 +36,7 @@ jobs: - name: Run unit tests with pytest run: | pytest tests/unit/ --doctest-modules --junitxml=junit/unit-test-results-${{ matrix.python-version }}.xml - + - name: Upload unit test results if: ${{ always() }} uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 @@ -44,40 +44,3 @@ jobs: name: unit-test-results-${{ matrix.python-version }} path: junit/unit-test-results-${{ matrix.python-version }}.xml retention-days: 7 - - integration-tests: - name: Integration Tests - runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - python-version: ["3.10", "3.12", "3.13"] - - steps: - - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - with: - submodules: true - - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@8d9ed9ac5c53483de85588cdf95a591a75ab9f55 # v5.5.0 - with: - python-version: ${{ matrix.python-version }} - cache: 'pip' - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -e . - pip install pytest pytest-cov pytest-asyncio - - - name: Run integration tests with pytest - run: | - pytest tests/integration/ --doctest-modules --junitxml=junit/integration-test-results-${{ matrix.python-version }}.xml - - - name: Upload integration test results - if: ${{ always() }} - uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 - with: - name: integration-test-results-${{ matrix.python-version }} - path: junit/integration-test-results-${{ matrix.python-version }}.xml - retention-days: 7