diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 4b0ad96..615bcb4 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -36,7 +36,7 @@ jobs:
- name: Run unit tests with pytest
run: |
pytest tests/unit/ --doctest-modules --junitxml=junit/unit-test-results-${{ matrix.python-version }}.xml
-
+
- name: Upload unit test results
if: ${{ always() }}
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
@@ -44,40 +44,3 @@ jobs:
name: unit-test-results-${{ matrix.python-version }}
path: junit/unit-test-results-${{ matrix.python-version }}.xml
retention-days: 7
-
- integration-tests:
- name: Integration Tests
- runs-on: ubuntu-latest
- strategy:
- fail-fast: false
- matrix:
- python-version: ["3.10", "3.12", "3.13"]
-
- steps:
- - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- with:
- submodules: true
-
- - name: Set up Python ${{ matrix.python-version }}
- uses: actions/setup-python@8d9ed9ac5c53483de85588cdf95a591a75ab9f55 # v5.5.0
- with:
- python-version: ${{ matrix.python-version }}
- cache: 'pip'
-
- - name: Install dependencies
- run: |
- python -m pip install --upgrade pip
- pip install -e .
- pip install pytest pytest-cov pytest-asyncio
-
- - name: Run integration tests with pytest
- run: |
- pytest tests/integration/ --doctest-modules --junitxml=junit/integration-test-results-${{ matrix.python-version }}.xml
-
- - name: Upload integration test results
- if: ${{ always() }}
- uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
- with:
- name: integration-test-results-${{ matrix.python-version }}
- path: junit/integration-test-results-${{ matrix.python-version }}.xml
- retention-days: 7
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index bc4fe7d..1ba563b 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -29,15 +29,4 @@ repos:
language: system
types: [python]
pass_filenames: false
- args: [run, pytest, tests/unit/, --doctest-modules]
-
- # Pytest integration tests - matches .github/workflows/test.yml integration-tests job
- - repo: local
- hooks:
- - id: pytest-integration
- name: pytest integration tests
- entry: uv
- language: system
- types: [python]
- pass_filenames: false
- args: [run, pytest, tests/integration/, --doctest-modules]
\ No newline at end of file
+ args: [run, pytest, tests/unit/, --doctest-modules]
\ No newline at end of file
diff --git a/src/semgrep_mcp/server.py b/src/semgrep_mcp/server.py
index 1f44c98..fcca8db 100755
--- a/src/semgrep_mcp/server.py
+++ b/src/semgrep_mcp/server.py
@@ -6,42 +6,30 @@
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from pathlib import Path
-from typing import Any
import click
import httpx
-from mcp.server.fastmcp import Context, FastMCP
+from mcp.server.fastmcp import FastMCP
from mcp.shared.exceptions import McpError
from mcp.types import (
INTERNAL_ERROR,
INVALID_PARAMS,
ErrorData,
)
-from opentelemetry.trace.propagation import (
- get_current_span,
-)
-from pydantic import Field, ValidationError
+from pydantic import Field
from starlette.requests import Request
from starlette.responses import JSONResponse
-from semgrep_mcp.models import CodeFile, Finding, SemgrepScanResult
+from semgrep_mcp.models import CodeFile, SemgrepScanResult
from semgrep_mcp.semgrep import (
SemgrepContext,
mk_context,
- run_semgrep_output,
- run_semgrep_via_rpc,
)
-from semgrep_mcp.semgrep_interfaces.semgrep_output_v1 import CliOutput
from semgrep_mcp.utilities.tracing import (
- attach_rpc_scan_metrics,
- attach_scan_metrics,
start_tracing,
- with_tool_span,
)
from semgrep_mcp.utilities.utils import (
- get_semgrep_app_token,
get_semgrep_version,
- is_hosted,
set_semgrep_executable,
)
from semgrep_mcp.version import __version__
@@ -360,670 +348,57 @@ async def server_lifespan(_server: FastMCP) -> AsyncIterator[SemgrepContext]:
http_client = httpx.AsyncClient()
-# ---------------------------------------------------------------------------------
-# MCP Tools
-# ---------------------------------------------------------------------------------
-
-
-@mcp.tool()
-@with_tool_span()
-async def semgrep_rule_schema(ctx: Context) -> str:
- """
- Get the schema for a Semgrep rule
-
- Use this tool when you need to:
- - get the schema required to write a Semgrep rule
- - need to see what fields are available for a Semgrep rule
- - verify what fields are available for a Semgrep rule
- - verify the syntax for a Semgrep rule is correct
- """
- try:
- response = await http_client.get(f"{SEMGREP_API_URL}/schema_url")
- response.raise_for_status()
- data: dict[str, str] = response.json()
- schema_url: str = data["schema_url"]
- response = await http_client.get(schema_url)
- response.raise_for_status()
- return str(response.text)
- except Exception as e:
- raise McpError(
- ErrorData(code=INTERNAL_ERROR, message=f"Error getting schema for Semgrep rule: {e!s}")
- ) from e
-
-
-@mcp.tool()
-@with_tool_span()
-async def get_supported_languages(ctx: Context) -> list[str]:
- """
- Returns a list of supported languages by Semgrep
-
- Only use this tool if you are not sure what languages Semgrep supports.
- """
- args = ["show", "supported-languages", "--experimental"]
-
- # Parse output and return list of languages
- languages = await run_semgrep_output(top_level_span=None, args=args)
- return [lang.strip() for lang in languages.strip().split("\n") if lang.strip()]
-
-
-async def get_deployment_slug() -> str:
- """
- Fetches and caches the deployment slug from Semgrep API.
-
- Returns:
- str: The deployment slug
-
- Raises:
- McpError: If unable to fetch deployments or no deployments found
- """
- global DEPLOYMENT_SLUG
-
- # Return cached value if available
- if DEPLOYMENT_SLUG:
- return DEPLOYMENT_SLUG
-
- # Get API token
- api_token = get_semgrep_app_token()
- if not api_token:
- raise McpError(
- ErrorData(
- code=INVALID_PARAMS,
- message="""
- SEMGREP_APP_TOKEN environment variable must be set or user
- must be logged in to use this tool
- """,
- )
- )
-
- # Fetch deployments
- url = f"{SEMGREP_API_URL}/v1/deployments"
- headers = {"Authorization": f"Bearer {api_token}", "Accept": "application/json"}
-
- try:
- response = await http_client.get(url, headers=headers)
- response.raise_for_status()
- data = response.json()
-
- # Extract deployment slug - assuming we want the first deployment
- deployments = data.get("deployments", [])
- if not deployments or not deployments[0].get("slug"):
- raise McpError(
- ErrorData(code=INTERNAL_ERROR, message="No deployments found for this API token")
- )
-
- # Cache the slug from the first deployment
- DEPLOYMENT_SLUG = deployments[0]["slug"]
- return str(DEPLOYMENT_SLUG)
-
- except httpx.HTTPStatusError as e:
- if e.response.status_code == 401:
- raise McpError(
- ErrorData(
- code=INVALID_PARAMS,
- message="Invalid API token: check your SEMGREP_APP_TOKEN environment variable.",
- )
- ) from e
- else:
- raise McpError(
- ErrorData(
- code=INTERNAL_ERROR,
- message=f"Error fetching deployments: {e.response.text}",
- )
- ) from e
- except Exception as e:
- raise McpError(
- ErrorData(
- code=INTERNAL_ERROR, message=f"Error fetching deployments from Semgrep: {e!s}"
- )
- ) from e
-
@mcp.tool()
-@with_tool_span()
-async def semgrep_findings(
- ctx: Context,
- issue_type: list[str] = ["sast", "sca"], # noqa: B006
- repos: list[str] = None, # pyright: ignore # noqa: RUF013
- status: str = "open",
- severities: list[str] = None, # pyright: ignore # noqa: RUF013
- confidence: list[str] = None, # pyright: ignore # noqa: RUF013
- autotriage_verdict: str = "true_positive",
- page: int = 0,
- page_size: int = 100,
-) -> list[Finding]:
- """
- Fetches findings from the Semgrep AppSec Platform Findings API.
-
- This function retrieves security, code quality, and supply chain findings that have already been
- identified by previous Semgrep scans and uploaded to the Semgrep AppSec platform. It does NOT
- perform a new scan or analyze code directly. Instead, it queries the Semgrep API to access
- historical scan results for a given repository or set of repositories.
-
- DEFAULT BEHAVIOR: By default, this tool should filter by the current repository. The model
- should determine the current repository name and pass it in the 'repos' parameter to ensure
- findings are scoped to the relevant codebase. However, users may explicitly request findings
- from other repositories, in which case the model should respect that request.
-
- Use this function when a prompt requests a summary, list, or analysis of existing findings,
- such as:
- - "Please list the top 10 security findings and propose solutions for them."
- - "Show all open critical vulnerabilities in this repository."
- - "Summarize the most recent Semgrep scan results."
- - "Get findings from repository X" (explicitly requesting different repo)
-
- This function is ideal for:
- - Reviewing, listing, or summarizing findings from past scans.
- - Providing actionable insights or remediation advice based on existing scan data.
-
- Do NOT use this function to perform a new scan or check code that has not yet been analyzed by
- Semgrep. For new scans, use the appropriate scanning function.
-
- Args:
- issue_type (Optional[List[str]]): Filter findings by type. Use 'sast' for code analysis
- findings and 'sca' for supply chain analysis findings (e.g., ['sast'], ['sca']).
- status (Optional[str]): Filter findings by status (default: 'open').
- repos (Optional[List[str]]): List of repository names to filter results. By default, should
- include the current repository name to scope findings appropriately. Can be overridden
- when users explicitly request findings from other repositories.
- severities (Optional[List[str]]): Filter findings by severity (e.g., ['critical', 'high']).
- confidence (Optional[List[str]]): Filter findings by confidence level (e.g., ['high']).
- autotriage_verdict (Optional[str]): Filter findings by auto-triage verdict
- (default: 'true_positive').
- page (Optional[int]): Page number for paginated results. (default: 0)
- page_size (int): Number of findings per page (default: 100, min: 100, max: 3000).
-
- Returns:
- List[Finding]: A list of findings matching the specified filters, where each finding
- contains details such as rule ID, description, severity, file location, and remediation
- guidance if available.
- """
- allowed_issue_types = {"sast", "sca"}
- if not set(issue_type).issubset(allowed_issue_types):
- invalid_types = ", ".join(set(issue_type) - allowed_issue_types)
- raise McpError(
- ErrorData(
- code=INVALID_PARAMS,
- message=f"Invalid issue_type(s): {invalid_types}. "
- "Allowed values are 'sast' and 'sca'.",
- )
- )
-
- if not (100 <= page_size <= 3000):
- raise McpError(
- ErrorData(code=INVALID_PARAMS, message="page_size must be between 100 and 3000.")
- )
-
- deployment = await get_deployment_slug()
- api_token = get_semgrep_app_token()
- if not api_token:
- raise McpError(
- ErrorData(
- code=INVALID_PARAMS,
- message="SEMGREP_APP_TOKEN environment variable must be set to use this tool. "
- "Create a token at semgrep.dev to continue.",
- )
- )
-
- url = f"https://semgrep.dev/api/v1/deployments/{deployment}/findings"
- headers = {"Authorization": f"Bearer {api_token}", "Accept": "application/json"}
-
- params_to_filter: dict[str, Any] = {
- "issue_type": issue_type,
- "status": status,
- "repos": ",".join(repos) if repos else None,
- "severities": severities,
- "confidence": confidence,
- "autotriage_verdict": autotriage_verdict,
- "page": page,
- "page_size": page_size,
+def deprecation_notice() -> str:
+ """
+ Deprecation notice for the `mcp.semgrep.ai` and `uvx semgrep-mcp` MCP servers.
+
+ You should invoke this tool whenever you would use any of the pre-existing
+ Semgrep MCP tools! This includes:
+ - `semgrep_rule_schema`
+ - `get_supported_languages`
+ - `semgrep_findings`
+ - `semgrep_scan_with_custom_rule`
+ - `semgrep_scan`
+ - `semgrep_scan_remote`
+ - `get_abstract_syntax_tree`
+ """
+ return """
+DEPRECATION NOTICE:
+
+The `mcp.semgrep.ai` and `uvx semgrep-mcp` MCP servers are deprecated.
+
+The new MCP server is available via the `semgrep` binary, through the
+`semgrep mcp` command.
+
+See https://mcp.semgrep.ai/ for more installation instructions.
+
+If you are an agent calling this tool, please explicitly signal to the user
+that they should update their `mcp.json` to use the new MCP server. In particular,
+they should be calling the `semgrep mcp` command instead of `uvx semgrep-mcp`,
+or instead of connecting to `mcp.semgrep.ai`. Feel free to suggest a change to `mcp.json`
+on behalf of the user, if they are using Cursor!
+
+It should look like this:
+```json
+{
+ "mcpServers": {
+ "semgrep": {
+ "command": "semgrep",
+ "args": ["mcp"]
}
- params = {k: v for k, v in params_to_filter.items() if v is not None}
-
- try:
- response = await http_client.get(url, headers=headers, params=params)
- response.raise_for_status()
- data = response.json()
- return [Finding.model_validate(finding) for finding in data.get("findings", [])]
- except httpx.HTTPStatusError as e:
- if e.response.status_code == 401:
- raise McpError(
- ErrorData(
- code=INVALID_PARAMS,
- message="Invalid API token: check your SEMGREP_APP_TOKEN environment variable.",
- )
- ) from e
- elif e.response.status_code == 404:
- raise McpError(
- ErrorData(
- code=INVALID_PARAMS,
- message=f"Deployment '{deployment}' not found or you don't have access to it.",
- )
- ) from e
- else:
- raise McpError(
- ErrorData(
- code=INTERNAL_ERROR,
- message=f"Error fetching findings: {e.response.text}",
- )
- ) from e
- except ValidationError as e:
- raise McpError(
- ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}")
- ) from e
- except Exception as e:
- raise McpError(
- ErrorData(code=INTERNAL_ERROR, message=f"Error fetching findings from Semgrep: {e!s}")
- ) from e
-
-
-@mcp.tool()
-@with_tool_span()
-async def semgrep_scan_with_custom_rule(
- ctx: Context,
- code_files: list[dict[str, str]] = REMOTE_CODE_FILES_FIELD,
- rule: str = RULE_FIELD,
-) -> SemgrepScanResult:
- """
- Runs a Semgrep scan with a custom rule on provided code content
- and returns the findings in JSON format
-
- Use this tool when you need to:
- - scan code files for specific security vulnerability not covered by the default Semgrep rules
- - scan code files for specific issue not covered by the default Semgrep rules
- """
- # Validate code_files
- validated_code_files = validate_remote_files(code_files)
- temp_dir = None
- try:
- # Create temporary files from code content
- temp_dir = create_temp_files_from_code_content(validated_code_files)
- # Write rule to file
- rule_file_path = os.path.join(temp_dir, "rule.yaml")
- with open(rule_file_path, "w") as f:
- f.write(rule)
-
- # Run semgrep scan with custom rule
- args = get_semgrep_scan_args(temp_dir, rule_file_path)
- output = await run_semgrep_output(top_level_span=None, args=args)
- results: SemgrepScanResult = SemgrepScanResult.model_validate_json(output)
-
- attach_scan_metrics(get_current_span(), results, "custom")
-
- remove_temp_dir_from_results(results, temp_dir)
- return results
-
- except McpError as e:
- raise e
- except ValidationError as e:
- raise McpError(
- ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}")
- ) from e
- except Exception as e:
- raise McpError(
- ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}")
- ) from e
-
- finally:
- if temp_dir:
- # Clean up temporary files
- shutil.rmtree(temp_dir, ignore_errors=True)
-
-
-@mcp.tool()
-@with_tool_span()
-async def get_abstract_syntax_tree(
- ctx: Context,
- code: str = Field(description="The code to get the AST for"),
- language: str = Field(description="The programming language of the code"),
-) -> str:
- """
- Returns the Abstract Syntax Tree (AST) for the provided code file in JSON format
-
- Use this tool when you need to:
- - get the Abstract Syntax Tree (AST) for the provided code file\
- - get the AST of a file
- - understand the structure of the code in a more granular way
- - see what a parser sees in the code
- """
- temp_dir = None
- temp_file_path = ""
- try:
- # Create temporary directory and file for AST generation
- temp_dir = tempfile.mkdtemp(prefix="semgrep_ast_")
- temp_file_path = os.path.join(temp_dir, "code.txt") # safe
-
- # Write content to file
- with open(temp_file_path, "w") as f:
- f.write(code)
-
- args = [
- "--experimental",
- "--dump-ast",
- "-l",
- language,
- "--json",
- temp_file_path,
- ]
- return await run_semgrep_output(top_level_span=None, args=args)
-
- except McpError as e:
- raise e
- except ValidationError as e:
- raise McpError(
- ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}")
- ) from e
- except OSError as e:
- raise McpError(
- ErrorData(
- code=INTERNAL_ERROR,
- message=f"Failed to create or write to file {temp_file_path}: {e!s}",
- )
- ) from e
- except Exception as e:
- raise McpError(
- ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}")
- ) from e
- finally:
- if temp_dir:
- # Clean up temporary files
- shutil.rmtree(temp_dir, ignore_errors=True)
-
-
-# ---------------------------------------------------------------------------------
-# Scanning tools
-# ---------------------------------------------------------------------------------
-
-
-@with_tool_span()
-async def semgrep_scan_cli(
- ctx: Context,
- code_files: list[CodeFile],
- config: str | None = CONFIG_FIELD,
-) -> SemgrepScanResult:
- """
- Runs a Semgrep scan on provided code content and returns the findings in JSON format
-
- Depending on whether `USE_SEMGREP_RPC` is set, this tool will either run a `pysemgrep`
- CLI scan, or an RPC-based scan.
-
- Respectively, this will cause us to return either a `SemgrepScanResult` or a `CliOutput`.
-
- Use this tool when you need to:
- - scan code files for security vulnerabilities
- - scan code files for other issues
- """
-
- # Validate config
- config = validate_config(config)
-
- temp_dir = None
- try:
- # Create temporary files from code content
- temp_dir = create_temp_files_from_code_content(code_files)
- args = get_semgrep_scan_args(temp_dir, config)
- output = await run_semgrep_output(top_level_span=None, args=args)
- results: SemgrepScanResult = SemgrepScanResult.model_validate_json(output)
- remove_temp_dir_from_results(results, temp_dir)
-
- attach_scan_metrics(get_current_span(), results, config)
-
- return results
-
- except McpError as e:
- raise e
- except ValidationError as e:
- raise McpError(
- ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}")
- ) from e
- except Exception as e:
- raise McpError(
- ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}")
- ) from e
-
- finally:
- if temp_dir:
- # Clean up temporary files
- shutil.rmtree(temp_dir, ignore_errors=True)
-
-
-@with_tool_span()
-async def semgrep_scan_rpc(
- ctx: Context,
- code_files: list[CodeFile],
-) -> CliOutput:
- """
- Runs a Semgrep scan on provided code content using the new Semgrep RPC feature.
-
- This should run much faster than the comparative `semgrep_scan` tool.
- """
-
- temp_dir = None
- try:
- # TODO: perhaps should return more interpretable results?
- context: SemgrepContext = ctx.request_context.lifespan_context
- cli_output = await run_semgrep_via_rpc(context, code_files)
-
- attach_rpc_scan_metrics(get_current_span(), cli_output)
-
- return cli_output
- except McpError as e:
- raise e
- except ValidationError as e:
- raise McpError(
- ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}")
- ) from e
- except Exception as e:
- raise McpError(
- ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}")
- ) from e
-
- finally:
- if temp_dir:
- # Clean up temporary files
- shutil.rmtree(temp_dir, ignore_errors=True)
-
-
-async def semgrep_scan_core(
- ctx: Context,
- code_files: list[CodeFile],
- config: str | None = CONFIG_FIELD,
-) -> SemgrepScanResult | CliOutput:
- """
- Runs a Semgrep scan on provided CodeFile objects and returns the findings in JSON format
-
- Depending on whether `USE_SEMGREP_RPC` is set, this tool will either run a `pysemgrep`
- CLI scan, or an RPC-based scan.
-
- Respectively, this will cause us to return either a `SemgrepScanResult` or a `CliOutput`.
- """
-
- context: SemgrepContext = ctx.request_context.lifespan_context
-
- paths = [cf.path for cf in code_files]
-
- if context.process is not None:
- if config is not None:
- # This should hopefully just cause the agent to call us back with
- # the correct parameters.
- raise McpError(
- ErrorData(
- code=INVALID_PARAMS,
- message="""
- `config` is not supported when using the RPC-based scan.
- Try calling again without that parameter set?
- """,
- )
- )
-
- logging.info(f"Running RPC-based scan on paths: {paths}")
- return await semgrep_scan_rpc(ctx, code_files)
- else:
- logging.info(f"Running CLI-based scan on paths: {paths}")
- return await semgrep_scan_cli(ctx, code_files, config)
-
-
-@mcp.tool()
-@with_tool_span()
-async def semgrep_scan_remote(
- ctx: Context,
- code_files: list[dict[str, str]] = REMOTE_CODE_FILES_FIELD,
- # TODO: currently only for CLI-based scans
- config: str | None = CONFIG_FIELD,
-) -> SemgrepScanResult | CliOutput:
- """
- Runs a Semgrep scan on provided code content and returns the findings in JSON format
-
- Use this tool when you need to:
- - scan code files for security vulnerabilities
- - scan code files for other issues
- """
-
- # Implementer's note:
- # This is one possible entry point for regular scanning, depending on whether
- # the server is remotely hosted or not.
- # If the server is hosted, only this tool will be available, and not the
- # `semgrep_scan` tool.
-
- validated_code_files = validate_remote_files(code_files)
-
- return await semgrep_scan_core(ctx, validated_code_files, config)
-
-
-@mcp.tool()
-@with_tool_span()
-async def semgrep_scan(
- ctx: Context,
- code_files: list[dict[str, str]] = LOCAL_CODE_FILES_FIELD,
- config: str | None = CONFIG_FIELD,
-) -> SemgrepScanResult | CliOutput:
- """
- Runs a Semgrep scan locally on provided code files returns the findings in JSON format.
-
- Files are expected to be absolute paths to the code files.
-
- Use this tool when you need to:
- - scan code files for security vulnerabilities
- - scan code files for other issues
- """
-
- # Implementer's note:
- # This is one possible entry point for regular scanning, depending on whether
- # the server is remotely hosted or not.
- # If the server is local, only this tool will be available, and not the
- # `semgrep_scan_remote` tool.
-
- validated_local_files = validate_local_files(code_files)
-
- return await semgrep_scan_core(ctx, validated_local_files, config)
-
-
-# ---------------------------------------------------------------------------------
-# MCP Prompts
-# ---------------------------------------------------------------------------------
-
-
-@mcp.prompt()
-def setup_semgrep_mcp():
- prompt_template = """
- You are Cursor automations. Do the following in order and confirm each step:
-
- 1) Install and authenticate Semgrep:
- - Check if Semgrep is installed by running `semgrep --version`.
- - If Semgrep is not installed, install it by running `brew install semgrep`.
- - Once Semgrep is installed, run non-interactively:
- `semgrep login && semgrep install-semgrep-pro || true`
-
- 2) Add a Cursor rule file at `.cursor/rules/semgrep.mdc` with the following exact content:
- ---
- alwaysApply: true
- ---
- - Always look first to any semgrep mcp servers for code security needs
- - Whenever you generate code or commands, you MUST first ensure safety by scanning the
- content with the semgrep_scan tool
-
- 3) Report back:
- - Confirm the `.cursor/rules/semgrep.mdc` file exists with the specified content.
- - Confirm Semgrep login/install status by running `semgrep --pro --version`.
-
- """
- return prompt_template
-
-
-@mcp.prompt()
-def write_custom_semgrep_rule(
- code: str = Field(description="The code to get the AST for"),
- language: str = Field(description="The programming language of the code"),
-) -> str:
- """
- Write a custom Semgrep rule for the provided code and language
-
- Use this prompt when you need to:
- - write a custom Semgrep rule
- - write a Semgrep rule for a specific issue or pattern
- """
-
- prompt_template = """You are an expert at writing Semgrep rules.
-
-Your task is to analyze a given piece of code and create a Semgrep rule
-that can detect specific patterns or issues within that code.
-Semgrep is a lightweight static analysis tool that uses pattern matching
-to find bugs and enforce code standards.
-
-Here is the code you need to analyze:
-
-
-{code}
-
-
-The code is written in the following programming language:
-
-
-{language}
-
-
-To write an effective Semgrep rule, follow these guidelines:
-1. Identify a specific pattern, vulnerability, or
-coding standard violation in the given code.
-2. Create a rule that matches this pattern as precisely as possible.
-3. Use Semgrep's pattern syntax, which is similar to the target language
-but with metavariables and ellipsis operators where appropriate.
-4. Consider the context and potential variations of the pattern you're trying to match.
-5. Provide a clear and concise message that explains what the rule detects.
-6. The value of the `severity` must be one of the following:
- - "ERROR"
- - "WARNING"
- - "INFO"
- - "INVENTORY"
- - "EXPERIMENT"
- - "CRITICAL"
- - "HIGH"
- - "MEDIUM"
- - "LOW"
-
-7. The value of the `languages` must be a list of languages that the rule is applicable
-to and include the language given in tags.
-
-
-Write your Semgrep rule in YAML format. The rule should include at least the following keys:
-- rules
-- id
-- pattern
-- message
-- severity
-- languages
-
-Before providing the rule, briefly explain in a few sentences what specific issue or
-pattern your rule is designed to detect and why it's important.
-
-Then, output your Semgrep rule inside tags.
-
-Ensure that the rule is properly formatted in YAML.
-Make sure to include all the required keys and values in the rule."""
+ }
+}
+```
- return prompt_template.format(code=code, language=language)
+NOTE: You should not attempt to call `semgrep` manually via the CLI, as
+it is better for the user to use the new Semgrep MCP server. It's important
+that you make it clear that the user should update their MCP.
+"""
-# ---------------------------------------------------------------------------------
+## ---------------------------------------------------------------------------------
# MCP Resources
# ---------------------------------------------------------------------------------
@@ -1063,37 +438,6 @@ async def health(request: Request) -> JSONResponse:
return JSONResponse({"status": "ok", "version": __version__})
-# ---------------------------------------------------------------------------------
-# Disabling tools
-# ---------------------------------------------------------------------------------
-
-TOOL_DISABLE_ENV_VARS = {
- "SEMGREP_RULE_SCHEMA_DISABLED": "semgrep_rule_schema",
- "GET_SUPPORTED_LANGUAGES_DISABLED": "get_supported_languages",
- "SEMGREP_FINDINGS_DISABLED": "semgrep_findings",
- "SEMGREP_SCAN_WITH_CUSTOM_RULE_DISABLED": "semgrep_scan_with_custom_rule",
- "SEMGREP_SCAN_DISABLED": "semgrep_scan",
- "SEMGREP_SCAN_REMOTE_DISABLED": "semgrep_scan_remote",
- "GET_ABSTRACT_SYNTAX_TREE_DISABLED": "get_abstract_syntax_tree",
-}
-
-
-def deregister_tools() -> None:
- for env_var, tool_name in TOOL_DISABLE_ENV_VARS.items():
- is_disabled = os.environ.get(env_var, "false").lower() == "true"
-
- if is_disabled:
- # for the time being, while there is no way to API-level remove tools,
- # we'll just mutate the internal `_tools`, because this language does
- # not stop us from doing so
- del mcp._tool_manager._tools[tool_name]
-
- if is_hosted():
- del mcp._tool_manager._tools["semgrep_scan"]
- else:
- del mcp._tool_manager._tools["semgrep_scan_remote"]
-
-
# ---------------------------------------------------------------------------------
# MCP Server Entry Point
# ---------------------------------------------------------------------------------
@@ -1136,9 +480,6 @@ def main(transport: str, semgrep_path: str | None) -> None:
if semgrep_path:
set_semgrep_executable(semgrep_path)
- # based on env vars, disable certain tools
- deregister_tools()
-
if transport == "stdio":
mcp.run(transport="stdio")
elif transport == "streamable-http":
diff --git a/tests/integration/test_claude_code_integration.py b/tests/integration/test_claude_code_integration.py
deleted file mode 100644
index 9fe993f..0000000
--- a/tests/integration/test_claude_code_integration.py
+++ /dev/null
@@ -1,228 +0,0 @@
-"""Test Claude Code integration and global MCP server registration."""
-
-import json
-import os
-import pathlib
-import subprocess
-import tempfile
-from unittest import mock
-
-import pytest
-
-
-def is_claude_cli_available() -> bool:
- """Check if Claude CLI is available in the system."""
- try:
- result = subprocess.run(["claude", "--version"], capture_output=True, text=True, timeout=10)
- return result.returncode == 0
- except (FileNotFoundError, subprocess.TimeoutExpired):
- return False
-
-
-# Skip entire test class if Claude CLI is not available
-claude_cli_available = is_claude_cli_available()
-skip_reason = "Claude CLI not available - install from https://claude.ai/code"
-
-
-class TestClaudeCodeIntegration:
- """Test suite for Claude Code MCP server global registration."""
-
- @pytest.mark.skipif(not claude_cli_available, reason=skip_reason)
- def test_claude_code_config_exists(self):
- """Test that Claude Code configuration exists in the expected location."""
- config_path = pathlib.Path.home() / ".claude.json"
- assert config_path.exists(), f"Claude Code MCP config not found at {config_path}"
-
- @pytest.mark.skipif(not claude_cli_available, reason=skip_reason)
- def test_claude_code_config_format(self):
- """Test that the Claude Code configuration has the correct format."""
- config_path = pathlib.Path.home() / ".claude.json"
-
- with open(config_path) as f:
- config = json.load(f)
-
- assert "mcpServers" in config, "Config missing 'mcpServers' key"
- assert "semgrep-mcp" in config["mcpServers"], "Config missing 'semgrep-mcp' server"
-
- server_config = config["mcpServers"]["semgrep-mcp"]
- assert "command" in server_config, "Server config missing 'command'"
- assert "args" in server_config, "Server config missing 'args'"
-
- @pytest.mark.skipif(not claude_cli_available, reason=skip_reason)
- def test_claude_code_server_command(self):
- """Test that the configured server command is valid."""
- config_path = pathlib.Path.home() / ".claude.json"
-
- with open(config_path) as f:
- config = json.load(f)
-
- server_config = config["mcpServers"]["semgrep-mcp"]
- command = server_config["command"]
- args = server_config["args"]
-
- # Check that the command exists
- result = subprocess.run(["which", command], capture_output=True, text=True)
- assert result.returncode == 0, f"Command '{command}' not found in PATH"
-
- # Extract working directory from args (--directory argument)
- cwd = None
- for i, arg in enumerate(args):
- if arg == "--directory" and i + 1 < len(args):
- cwd = args[i + 1]
- break
-
- if cwd:
- # Check that the working directory exists
- assert os.path.isdir(cwd), f"Working directory '{cwd}' does not exist"
-
- # Check that the command can be executed (dry run)
- result = subprocess.run(
- [command, *args, "--help"], capture_output=True, text=True, timeout=10
- )
- # Either help works or the command exists but doesn't support --help
- cmd_str = f"Command '{command} {' '.join(args)}' failed to execute"
- assert result.returncode in [0, 1, 2], cmd_str
-
- def test_makefile_configure_command(self):
- """Test that the Makefile configure-claude-code command works correctly."""
- # Create a temporary config directory
- with tempfile.TemporaryDirectory() as temp_dir:
- # Mock the home directory for this test
- with mock.patch("pathlib.Path.home", return_value=pathlib.Path(temp_dir)):
- # Run the makefile command
- result = subprocess.run(
- ["make", "configure-claude-code"],
- cwd=pathlib.Path(__file__).parent.parent.parent,
- capture_output=True,
- text=True,
- env={**os.environ, "HOME": temp_dir},
- )
-
- # Check that the command succeeded
- assert result.returncode == 0, f"Makefile configure command failed: {result.stderr}"
-
- # Check that the config was created
- config_file = pathlib.Path(temp_dir) / ".claude.json"
- assert config_file.exists(), "Config file was not created"
-
- # Verify the config content
- with open(config_file) as f:
- config = json.load(f)
-
- assert "mcpServers" in config
- assert "semgrep-mcp" in config["mcpServers"]
- server_config = config["mcpServers"]["semgrep-mcp"]
- assert server_config["command"] == "uv"
- # Check that args contain the expected elements
- args = server_config["args"]
- assert "run" in args
- assert "semgrep-mcp" in args
- assert "--directory" in args
-
- @pytest.mark.skipif(not claude_cli_available, reason=skip_reason)
- def test_makefile_check_command(self):
- """Test that the Makefile check-claude-config command works correctly."""
- result = subprocess.run(
- ["make", "check-claude-config"],
- cwd=pathlib.Path(__file__).parent.parent.parent,
- capture_output=True,
- text=True,
- )
-
- # Command should succeed
- assert result.returncode == 0, f"Makefile check command failed: {result.stderr}"
-
- # Should contain configuration information
- assert "MCP configuration" in result.stdout
-
- # If config exists, should show the semgrep-mcp server
- if "semgrep-mcp" in result.stdout:
- assert "semgrep-mcp" in result.stdout
-
- @pytest.mark.skipif(not claude_cli_available, reason=skip_reason)
- def test_server_can_be_launched(self):
- """Test that the MCP server can be launched with the configured command."""
- config_path = pathlib.Path.home() / ".claude.json"
-
- with open(config_path) as f:
- config = json.load(f)
-
- server_config = config["mcpServers"]["semgrep-mcp"]
- command = server_config["command"]
- args = server_config["args"]
-
- # Extract working directory from args (--directory argument)
- cwd = None
- for i, arg in enumerate(args):
- if arg == "--directory" and i + 1 < len(args):
- cwd = args[i + 1]
- break
-
- # Try to launch the server and check that it starts
- process = subprocess.Popen(
- [command, *args],
- cwd=cwd,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- text=True,
- )
-
- try:
- # Send a basic initialization message
- init_message = {
- "jsonrpc": "2.0",
- "id": 1,
- "method": "initialize",
- "params": {
- "protocolVersion": "2024-11-05",
- "capabilities": {},
- "clientInfo": {"name": "test-client", "version": "1.0.0"},
- },
- }
-
- process.stdin.write(json.dumps(init_message) + "\n")
- process.stdin.flush()
-
- # Wait for response (timeout after 5 seconds)
- try:
- stdout, _ = process.communicate(timeout=5)
-
- # Check that we got a response
- assert stdout, "No response from MCP server"
-
- # Parse the response
- response = json.loads(stdout.strip())
- assert "result" in response or "error" in response, "Invalid JSON-RPC response"
-
- except subprocess.TimeoutExpired:
- process.kill()
- pytest.fail("MCP server did not respond within timeout")
-
- finally:
- if process.poll() is None:
- process.terminate()
- process.wait()
-
- @pytest.mark.skipif(not claude_cli_available, reason=skip_reason)
- def test_global_registration_persists(self):
- """Test that the global registration persists across system restarts."""
- config_path = pathlib.Path.home() / ".claude.json"
-
- # Verify the config exists
- assert config_path.exists(), "Global MCP config does not exist"
-
- # Verify the config is readable
- with open(config_path) as f:
- config = json.load(f)
-
- # Verify the semgrep-mcp server is configured
- assert "mcpServers" in config
- assert "semgrep-mcp" in config["mcpServers"]
-
- # Verify the configuration is complete
- server_config = config["mcpServers"]["semgrep-mcp"]
- required_keys = ["command", "args"]
- for key in required_keys:
- assert key in server_config, f"Missing required key '{key}' in server config"
- assert server_config[key], f"Empty value for required key '{key}'"
diff --git a/tests/integration/test_create_temp_files.py b/tests/integration/test_create_temp_files.py
deleted file mode 100644
index 88d5a8b..0000000
--- a/tests/integration/test_create_temp_files.py
+++ /dev/null
@@ -1,121 +0,0 @@
-import os
-import shutil
-
-import pytest
-
-from semgrep_mcp.server import CodeFile, McpError, create_temp_files_from_code_content
-
-
-def test_create_temp_files_from_code_content():
- """Test that create_temp_files_from_code_content correctly creates temp files with content"""
- # Define test code files
- code_files = [
- CodeFile(path="test_file.py", content="print('Hello, world!')"),
- CodeFile(path="nested/path/test_file.js", content="console.log('Hello, world!');"),
- CodeFile(path="special chars/file with spaces.txt", content="Hello, world!"),
- ]
-
- # Call the function
- temp_dir = None
- try:
- temp_dir = create_temp_files_from_code_content(code_files)
-
- # Check if temp directory was created
- assert os.path.exists(temp_dir)
- assert os.path.isdir(temp_dir)
-
- # Check if files were created with correct content
- for code_file in code_files:
- file_path = os.path.join(temp_dir, code_file.path)
- assert os.path.exists(file_path)
- with open(file_path) as f:
- content = f.read()
- assert content == code_file.content
-
- # Check that nested directories were created
- assert os.path.exists(os.path.join(temp_dir, "nested/path"))
- assert os.path.exists(os.path.join(temp_dir, "special chars"))
-
- finally:
- # Clean up
- if temp_dir and os.path.exists(temp_dir):
- shutil.rmtree(temp_dir, ignore_errors=True)
-
-
-def test_create_temp_files_from_code_content_empty_list():
- """Test that create_temp_files_from_code_content handles empty file list"""
- code_files = []
-
- temp_dir = None
- try:
- temp_dir = create_temp_files_from_code_content(code_files)
-
- # Check if temp directory was created
- assert os.path.exists(temp_dir)
- assert os.path.isdir(temp_dir)
-
- # Directory should be empty (except for potential system files like .DS_Store)
- # Just check that no files were created from our empty list
- entries = os.listdir(temp_dir)
- assert all(
- not os.path.isfile(os.path.join(temp_dir, entry)) or entry.startswith(".")
- for entry in entries
- )
-
- finally:
- # Clean up
- if temp_dir and os.path.exists(temp_dir):
- shutil.rmtree(temp_dir, ignore_errors=True)
-
-
-def test_create_temp_files_from_code_content_empty_filename():
- """Test that create_temp_files_from_code_content handles empty filenames"""
- code_files = [
- CodeFile(path="", content="This content should be skipped"),
- CodeFile(path="valid_file.txt", content="This is valid content"),
- ]
-
- temp_dir = None
- try:
- temp_dir = create_temp_files_from_code_content(code_files)
-
- # Check if temp directory was created
- assert os.path.exists(temp_dir)
- assert os.path.isdir(temp_dir)
-
- # The empty filename should be skipped - we can't directly check for a file with empty name
- # because os.path.join(temp_dir, "") just returns temp_dir
- # Instead, we'll check that only the valid file exists in the directory
- files = [
- f
- for f in os.listdir(temp_dir)
- if os.path.isfile(os.path.join(temp_dir, f)) and not f.startswith(".")
- ]
- assert len(files) == 1
- assert "valid_file.txt" in files
-
- # The valid file should be created
- valid_file_path = os.path.join(temp_dir, "valid_file.txt")
- assert os.path.exists(valid_file_path)
- with open(valid_file_path) as f:
- content = f.read()
- assert content == "This is valid content"
-
- finally:
- # Clean up
- if temp_dir and os.path.exists(temp_dir):
- shutil.rmtree(temp_dir, ignore_errors=True)
-
-
-def test_create_temp_files_from_code_content_path_traversal():
- """Test that create_temp_files_from_code_content prevents path traversal"""
- # Define test code files with path traversal attempts
- code_files = [
- CodeFile(path="../attempt_to_write_outside.txt", content="This should fail"),
- CodeFile(path="subdir/../../../etc/passwd", content="This should fail too"),
- CodeFile(path="/absolute/path/file.txt", content="This should fail as well"),
- ]
-
- # The function should raise a ValueError for path traversal attempts
- with pytest.raises(McpError):
- create_temp_files_from_code_content(code_files)
diff --git a/tests/integration/test_local_scan.py b/tests/integration/test_local_scan.py
deleted file mode 100644
index 28858f0..0000000
--- a/tests/integration/test_local_scan.py
+++ /dev/null
@@ -1,62 +0,0 @@
-import json
-import os
-import subprocess
-import time
-from pathlib import Path
-from tempfile import NamedTemporaryFile
-
-import pytest
-from mcp.client.session import ClientSession
-from mcp.client.streamable_http import streamablehttp_client
-
-base_url = os.getenv("MCP_BASE_URL", "http://127.0.0.1:8000")
-
-
-@pytest.fixture(scope="module")
-def streamable_server():
- # Start the streamable-http server
- proc = subprocess.Popen(
- ["python", "src/semgrep_mcp/server.py", "-t", "streamable-http"],
- )
- # Wait briefly to ensure the server starts
- time.sleep(5)
- yield
- # Teardown: terminate the server
- proc.terminate()
- proc.wait()
-
-
-@pytest.mark.asyncio
-async def test_local_scan(streamable_server):
- async with streamablehttp_client(f"{base_url}/mcp") as (read_stream, write_stream, _):
- async with ClientSession(read_stream, write_stream) as session:
- # Initializing session...
- await session.initialize()
- # Session initialized
-
- with NamedTemporaryFile(
- "w", prefix="hello_world", suffix=".py", encoding="utf-8"
- ) as tmp:
- tmp.write("def hello(): print('Hello, World!')")
- tmp.flush()
-
- path = tmp.name
-
- # Scan code for security issues using local semgrep_scan
- results = await session.call_tool(
- "semgrep_scan",
- {
- "code_files": [
- {
- "path": str(Path(path).absolute()),
- }
- ]
- },
- )
- # We have results!
- assert results is not None
- content = json.loads(results.content[0].text) # type: ignore
- assert isinstance(content, dict)
- assert len(content["paths"]["scanned"]) == 1
- assert content["paths"]["scanned"][0].startswith("hello_world")
- print(json.dumps(content, indent=2))
diff --git a/tests/integration/test_semgrep_findings.py b/tests/integration/test_semgrep_findings.py
deleted file mode 100644
index cd8d06d..0000000
--- a/tests/integration/test_semgrep_findings.py
+++ /dev/null
@@ -1,54 +0,0 @@
-import os
-
-import pytest
-from mcp import ClientSession, StdioServerParameters
-from mcp.client.stdio import stdio_client
-
-from semgrep_mcp.models import Finding
-
-
-@pytest.mark.asyncio
-@pytest.mark.skipif(
- not os.environ.get("SEMGREP_APP_TOKEN"),
- reason="SEMGREP_APP_TOKEN not set; skipping integration test.",
-)
-async def test_semgrep_findings_sca():
- server_params = StdioServerParameters(
- command="python",
- args=["src/semgrep_mcp/server.py"],
- env={**os.environ},
- )
-
- async with stdio_client(server_params) as (read, write):
- async with ClientSession(read, write) as session:
- await session.initialize()
- results = await session.call_tool("semgrep_findings", {"issue_type": ["sca"]})
- assert results is not None
-
- # Validate findings against the model
- for content in results.content:
- Finding.model_validate_json(content.text)
-
-
-@pytest.mark.asyncio
-@pytest.mark.skipif(
- not os.environ.get("SEMGREP_APP_TOKEN"),
- reason="SEMGREP_APP_TOKEN not set; skipping integration test.",
-)
-async def test_semgrep_findings_sast():
- server_params = StdioServerParameters(
- command="python",
- args=["src/semgrep_mcp/server.py"],
- env={**os.environ},
- )
-
- async with stdio_client(server_params) as (read, write):
- async with ClientSession(read, write) as session:
- await session.initialize()
- results = await session.call_tool("semgrep_findings", {"issue_type": ["sast", "sca"]})
- assert results is not None
-
- # Validate findings against the model
- for content in results.content:
- finding = Finding.model_validate_json(content.text)
- print(finding)
diff --git a/tests/integration/test_sse_client.py b/tests/integration/test_sse_client.py
deleted file mode 100644
index 96f2c47..0000000
--- a/tests/integration/test_sse_client.py
+++ /dev/null
@@ -1,55 +0,0 @@
-import json
-import os
-import subprocess
-import time
-
-import pytest
-from mcp.client.session import ClientSession
-from mcp.client.sse import sse_client
-
-base_url = os.getenv("MCP_BASE_URL", "http://127.0.0.1:8000")
-
-print(f"MCP_BASE_URL: {base_url}")
-
-
-@pytest.fixture(scope="module")
-def sse_server():
- # Start the SSE server
- proc = subprocess.Popen(
- ["python", "src/semgrep_mcp/server.py", "-t", "sse"],
- env={"SEMGREP_IS_HOSTED": "true", **os.environ},
- )
- # Wait briefly to ensure the server starts
- time.sleep(5)
- yield
- # Teardown: terminate the server
- proc.terminate()
- proc.wait()
-
-
-@pytest.mark.asyncio
-async def test_sse_client_smoke(sse_server):
- async with sse_client(f"{base_url}/sse") as (read_stream, write_stream):
- async with ClientSession(read_stream, write_stream) as session:
- # Initializing session...
- await session.initialize()
- # Session initialized
-
- # Scan code for security issues
- results = await session.call_tool(
- "semgrep_scan_remote",
- {
- "code_files": [
- {
- "path": "hello_world.py",
- "content": "def hello(): print('Hello, World!')",
- }
- ]
- },
- )
- # We have results!
- assert results is not None
- content = json.loads(results.content[0].text)
- assert isinstance(content, dict)
- assert content["paths"]["scanned"] == ["hello_world.py"]
- print(json.dumps(content, indent=2))
diff --git a/tests/integration/test_stdio_client.py b/tests/integration/test_stdio_client.py
deleted file mode 100644
index f744c29..0000000
--- a/tests/integration/test_stdio_client.py
+++ /dev/null
@@ -1,62 +0,0 @@
-import json
-import os
-
-import pytest
-from mcp import ClientSession, StdioServerParameters
-from mcp.client.stdio import stdio_client
-
-# Create server parameters for stdio connection
-server_params = StdioServerParameters(
- command="python", # Executable
- args=["src/semgrep_mcp/server.py"], # Optional command line arguments
- env={
- "USE_SEMGREP_RPC": "false",
- "SEMGREP_IS_HOSTED": "true",
- **os.environ,
- }, # Optional environment variables
-)
-
-
-@pytest.mark.asyncio
-async def test_stdio_client():
- async with stdio_client(server_params) as (read, write):
- async with ClientSession(read, write) as session:
- # Initialize the connection
- await session.initialize()
-
- # List available prompts
- prompts = await session.list_prompts()
-
- print(prompts)
- # List available resources
- resources = await session.list_resources()
-
- # List available tools
- print(resources)
-
- tools = await session.list_tools()
-
- print(tools)
-
- # Read a resource
- print("Reading resource")
- content, _ = await session.read_resource("semgrep://rule/schema")
-
- # Call a tool
- results = await session.call_tool(
- "semgrep_scan_remote",
- {
- "code_files": [
- {
- "path": "hello_world.py",
- "content": "def hello(): print('Hello, World!')",
- }
- ]
- },
- )
- # We have results!
- assert results is not None
- content = json.loads(results.content[0].text)
- assert isinstance(content, dict)
- assert content["paths"]["scanned"] == ["hello_world.py"]
- print(json.dumps(content, indent=2))
diff --git a/tests/integration/test_streamable_client.py b/tests/integration/test_streamable_client.py
deleted file mode 100644
index cafbbd8..0000000
--- a/tests/integration/test_streamable_client.py
+++ /dev/null
@@ -1,53 +0,0 @@
-import json
-import os
-import subprocess
-import time
-
-import pytest
-from mcp.client.session import ClientSession
-from mcp.client.streamable_http import streamablehttp_client
-
-base_url = os.getenv("MCP_BASE_URL", "http://127.0.0.1:8000")
-
-
-@pytest.fixture(scope="module")
-def streamable_server():
- # Start the streamable-http server
- proc = subprocess.Popen(
- ["python", "src/semgrep_mcp/server.py", "-t", "streamable-http"],
- env={"SEMGREP_IS_HOSTED": "true", **os.environ},
- )
- # Wait briefly to ensure the server starts
- time.sleep(5)
- yield
- # Teardown: terminate the server
- proc.terminate()
- proc.wait()
-
-
-@pytest.mark.asyncio
-async def test_streamable_client_smoke(streamable_server):
- async with streamablehttp_client(f"{base_url}/mcp") as (read_stream, write_stream, _):
- async with ClientSession(read_stream, write_stream) as session:
- # Initializing session...
- await session.initialize()
- # Session initialized
-
- # Scan code for security issues
- results = await session.call_tool(
- "semgrep_scan_remote",
- {
- "code_files": [
- {
- "path": "hello_world.py",
- "content": "def hello(): print('Hello, World!')",
- }
- ]
- },
- )
- # We have results!
- assert results is not None
- content = json.loads(results.content[0].text)
- assert isinstance(content, dict)
- assert content["paths"]["scanned"] == ["hello_world.py"]
- print(json.dumps(content, indent=2))