From c07d6bf734f3c7ef4544e643eba2dcc187158a35 Mon Sep 17 00:00:00 2001 From: Prince Roshan Date: Sun, 23 Nov 2025 17:45:26 +0530 Subject: [PATCH 1/3] redesign config subsystem --- mcp_fuzzer/config/__init__.py | 6 +- mcp_fuzzer/config/discovery.py | 45 ++ mcp_fuzzer/config/loader.py | 434 +++--------------- mcp_fuzzer/config/parser.py | 49 ++ mcp_fuzzer/config/schema.py | 195 ++++++++ mcp_fuzzer/config/transports.py | 74 +++ tests/unit/config/test_config_loader.py | 295 ++++++------ .../unit/config/test_discovery_transports.py | 104 +++++ 8 files changed, 683 insertions(+), 519 deletions(-) create mode 100644 mcp_fuzzer/config/discovery.py create mode 100644 mcp_fuzzer/config/parser.py create mode 100644 mcp_fuzzer/config/schema.py create mode 100644 mcp_fuzzer/config/transports.py create mode 100644 tests/unit/config/test_discovery_transports.py diff --git a/mcp_fuzzer/config/__init__.py b/mcp_fuzzer/config/__init__.py index 39b8f1c..b83f404 100644 --- a/mcp_fuzzer/config/__init__.py +++ b/mcp_fuzzer/config/__init__.py @@ -33,6 +33,7 @@ # Import loader functions from .loader import ( + ConfigLoader, find_config_file, load_config_file, apply_config_file, @@ -67,10 +68,11 @@ "DEFAULT_FORCE_KILL_TIMEOUT", # Manager "config", - # Loader functions + # Loader helpers + "ConfigLoader", "find_config_file", "load_config_file", "apply_config_file", "get_config_schema", "load_custom_transports", -] \ No newline at end of file +] diff --git a/mcp_fuzzer/config/discovery.py b/mcp_fuzzer/config/discovery.py new file mode 100644 index 0000000..4534f39 --- /dev/null +++ b/mcp_fuzzer/config/discovery.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +"""Helpers for locating configuration files.""" + +from __future__ import annotations + +import os +from pathlib import Path + + +def find_config_file( + config_path: str | None = None, + search_paths: list[str] | None = None, + file_names: list[str] | None = None, +) -> str | None: + """Find a configuration file in the given paths. + + Args: + config_path: Explicit path to config file, takes precedence if provided + search_paths: List of directories to search for config files + file_names: List of file names to search for + + Returns: + Path to the found config file or None if not found + """ + if config_path and os.path.isfile(config_path): + return config_path + + if search_paths is None: + search_paths = [ + os.getcwd(), + str(Path.home() / ".config" / "mcp-fuzzer"), + ] + + if file_names is None: + file_names = ["mcp-fuzzer.yml", "mcp-fuzzer.yaml"] + + for path in search_paths: + if not os.path.isdir(path): + continue + for name in file_names: + file_path = os.path.join(path, name) + if os.path.isfile(file_path): + return file_path + + return None diff --git a/mcp_fuzzer/config/loader.py b/mcp_fuzzer/config/loader.py index fbc21ee..876d283 100644 --- a/mcp_fuzzer/config/loader.py +++ b/mcp_fuzzer/config/loader.py @@ -1,384 +1,92 @@ #!/usr/bin/env python3 -"""Configuration file loader for MCP Fuzzer. +"""Configuration loader helpers that glue the discovery/parser stack.""" -This module provides functionality to load configuration from YAML files. -""" +from __future__ import annotations -import os import logging -from pathlib import Path -from typing import Any - -import yaml +from typing import Any, Callable, Tuple +from .discovery import find_config_file from .manager import config +from .parser import load_config_file +from .schema import get_config_schema # noqa: F401 (exported for consumers) +from .transports import load_custom_transports from ..exceptions import ConfigFileError, MCPError -from ..transport.custom import register_custom_transport -from ..transport.base import TransportProtocol -import importlib logger = logging.getLogger(__name__) -def find_config_file( - config_path: str | None = None, - search_paths: list[str] | None = None, - file_names: list[str] | None = None, -) -> str | None: - """Find a configuration file in the given paths. - - Args: - config_path: Explicit path to config file, takes precedence if provided - search_paths: List of directories to search for config files - file_names: List of file names to search for - - Returns: - Path to the found config file or None if not found - """ - # If explicit path is provided, use it - if config_path and os.path.isfile(config_path): - return config_path - - # Default search paths - if search_paths is None: - search_paths = [ - os.getcwd(), # Current directory - str(Path.home() / ".config" / "mcp-fuzzer"), # User config directory - ] - - # Default file names - if file_names is None: - file_names = ["mcp-fuzzer.yml", "mcp-fuzzer.yaml"] - - # Search for config files - for path in search_paths: - if not os.path.isdir(path): - continue - - for name in file_names: - file_path = os.path.join(path, name) - if os.path.isfile(file_path): - return file_path - - return None - -def load_config_file(file_path: str) -> dict[str, Any]: - """Load configuration from a YAML file. +ConfigDict = dict[str, Any] +FileDiscoverer = Callable[ + [str | None, list[str] | None, list[str] | None], str | None +] +ConfigParser = Callable[[str], ConfigDict] +TransportLoader = Callable[[ConfigDict], None] + + +class ConfigLoader: + """Load configuration files with injectable discovery and parser implementations.""" + + def __init__( + self, + discoverer: FileDiscoverer | None = None, + parser: ConfigParser | None = None, + transport_loader: TransportLoader | None = None, + ): + self.discoverer = discoverer or find_config_file + self.parser = parser or load_config_file + self.transport_loader = transport_loader or load_custom_transports + + def load( + self, + config_path: str | None = None, + search_paths: list[str] | None = None, + file_names: list[str] | None = None, + ) -> Tuple[ConfigDict | None, str | None]: + """Return the configuration dictionary and source file path.""" + file_path = self.discoverer(config_path, search_paths, file_names) + if not file_path: + logger.debug("No configuration file found") + return None, None + + logger.info("Loading configuration from %s", file_path) + try: + config_data = self.parser(file_path) + self.transport_loader(config_data) + except (ConfigFileError, MCPError): + logger.exception("Failed to load configuration from %s", file_path) + raise - Args: - file_path: Path to the configuration file + return config_data, file_path - Returns: - Dictionary containing the configuration + def apply( + self, + config_path: str | None = None, + search_paths: list[str] | None = None, + file_names: list[str] | None = None, + ) -> bool: + """Load configuration and merge it into the runtime state.""" + try: + config_data, file_path = self.load(config_path, search_paths, file_names) + except (ConfigFileError, MCPError): + return False - Raises: - ConfigFileError: If the file cannot be found, parsed, or has permission issues - """ - if not os.path.isfile(file_path): - raise ConfigFileError(f"Configuration file not found: {file_path}") + if not file_path: + return False - # Verify file extension - if not file_path.endswith((".yml", ".yaml")): - raise ConfigFileError( - f"Unsupported configuration file format: {file_path}. " - "Only YAML files with .yml or .yaml extensions are supported." - ) + config.update(config_data or {}) + return True - try: - with open(file_path, "r", encoding="utf-8") as f: - return yaml.safe_load(f) or {} - except yaml.YAMLError as e: - raise ConfigFileError( - f"Error parsing YAML configuration file {file_path}: {str(e)}" - ) - except PermissionError: - raise ConfigFileError( - f"Permission denied when reading configuration file: {file_path}" - ) - except Exception as e: - raise ConfigFileError( - f"Unexpected error reading configuration file {file_path}: {str(e)}" - ) def apply_config_file( config_path: str | None = None, search_paths: list[str] | None = None, file_names: list[str] | None = None, ) -> bool: - """Find and apply configuration from a file. - - Args: - config_path: Explicit path to config file, takes precedence if provided - search_paths: List of directories to search for config files - file_names: List of file names to search for - - Returns: - True if configuration was loaded and applied, False otherwise - """ - # Find config file - file_path = find_config_file(config_path, search_paths, file_names) - if not file_path: - logger.debug("No configuration file found") - return False - - logger.info(f"Loading configuration from {file_path}") - try: - config_data = load_config_file(file_path) - load_custom_transports(config_data) - except (ConfigFileError, MCPError): - logger.exception("Failed to load configuration from %s", file_path) - return False - config.update(config_data) - return True - -def get_config_schema() -> dict[str, Any]: - """Get the configuration schema. - - Returns: - Dictionary describing the configuration schema - """ - return { - "type": "object", - "properties": { - "timeout": {"type": "number", "description": "Default timeout in seconds"}, - "tool_timeout": { - "type": "number", - "description": "Tool-specific timeout in seconds", - }, - "log_level": { - "type": "string", - "enum": ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], - }, - "safety_enabled": { - "type": "boolean", - "description": "Whether safety features are enabled", - }, - "fs_root": { - "type": "string", - "description": "Root directory for file operations", - }, - "http_timeout": { - "type": "number", - "description": "HTTP transport timeout in seconds", - }, - "sse_timeout": { - "type": "number", - "description": "SSE transport timeout in seconds", - }, - "stdio_timeout": { - "type": "number", - "description": "STDIO transport timeout in seconds", - }, - "mode": {"type": "string", "enum": ["tools", "tool", "protocol", "both"]}, - "phase": {"type": "string", "enum": ["realistic", "aggressive", "both"]}, - "protocol": { - "type": "string", - "enum": ["http", "https", "sse", "stdio", "streamablehttp"], - }, - "endpoint": {"type": "string", "description": "Server endpoint URL"}, - "runs": {"type": "integer", "description": "Number of fuzzing runs"}, - "runs_per_type": { - "type": "integer", - "description": "Number of runs per protocol type", - }, - "protocol_type": { - "type": "string", - "description": "Specific protocol type to fuzz", - }, - "no_network": {"type": "boolean", "description": "Disable network access"}, - "allow_hosts": { - "type": "array", - "items": {"type": "string"}, - "description": "List of allowed hosts", - }, - "max_concurrency": { - "type": "integer", - "description": "Maximum concurrent operations", - }, - "auth": { - "type": "object", - "properties": { - "providers": { - "type": "array", - "items": { - "type": "object", - "properties": { - "type": { - "type": "string", - "enum": ["api_key", "basic", "oauth", "custom"], - }, - "id": {"type": "string"}, - "config": {"type": "object"}, - }, - "required": ["type", "id"], - }, - }, - "mappings": { - "type": "object", - "additionalProperties": {"type": "string"}, - }, - }, - }, - "custom_transports": { - "type": "object", - "description": "Configuration for custom transport mechanisms", - "patternProperties": { - "^[a-zA-Z][a-zA-Z0-9_]*$": { - "type": "object", - "properties": { - "module": { - "type": "string", - "description": "Python module containing transport", - }, - "class": { - "type": "string", - "description": "Transport class name", - }, - "description": { - "type": "string", - "description": "Human-readable description", - }, - "factory": { - "type": "string", - "description": "Dotted path to factory function " - "(e.g., pkg.mod.create_transport)", - }, - "config_schema": { - "type": "object", - "description": "JSON schema for transport config", - }, - }, - "additionalProperties": False, - "required": ["module", "class"], - } - }, - "additionalProperties": False, - }, - "safety": { - "type": "object", - "properties": { - "enabled": {"type": "boolean"}, - "local_hosts": {"type": "array", "items": {"type": "string"}}, - "no_network": {"type": "boolean"}, - "header_denylist": {"type": "array", "items": {"type": "string"}}, - "proxy_env_denylist": { - "type": "array", - "items": {"type": "string"}, - }, - "env_allowlist": {"type": "array", "items": {"type": "string"}}, - }, - }, - "output": { - "type": "object", - "properties": { - "format": { - "type": "string", - "enum": ["json", "yaml", "csv", "xml"], - "description": "Output format for standardized reports", - }, - "directory": { - "type": "string", - "description": "Directory to save output files", - }, - "compress": { - "type": "boolean", - "description": "Whether to compress output files", - }, - "types": { - "type": "array", - "items": { - "type": "string", - "enum": [ - "fuzzing_results", - "error_report", - "safety_summary", - "performance_metrics", - "configuration_dump", - ], - }, - "description": "Specific output types to generate", - }, - "schema": { - "type": "string", - "description": "Path to custom output schema file", - }, - "retention": { - "type": "object", - "properties": { - "days": { - "type": "integer", - "description": "Number of days to retain output files", - }, - "max_size": { - "type": "string", - "description": ( - "Maximum size of output directory " - "(e.g., '1GB', '500MB')" - ), - }, - }, - }, - }, - }, - }, - } - -def load_custom_transports(config_data: dict[str, Any]) -> None: - """Load and register custom transports from configuration. - - Args: - config_data: Configuration dictionary containing custom_transports section - """ - custom_transports = config_data.get("custom_transports", {}) - - for transport_name, transport_config in custom_transports.items(): - try: - # Import the module - module_path = transport_config["module"] - class_name = transport_config["class"] - - module = importlib.import_module(module_path) - transport_class = getattr(module, class_name) - if not isinstance(transport_class, type): - raise ConfigFileError(f"{module_path}.{class_name} is not a class") - if not issubclass(transport_class, TransportProtocol): - raise ConfigFileError( - f"{module_path}.{class_name} must subclass TransportProtocol" - ) - - # Register the transport - description = transport_config.get("description", "") - config_schema = transport_config.get("config_schema") - factory_fn = None - factory_path = transport_config.get("factory") - if factory_path: - try: - mod_path, attr = factory_path.rsplit(".", 1) - except ValueError as ve: - raise ConfigFileError( - f"Invalid factory path '{factory_path}'; expected 'module.attr'" - ) from ve - fmod = importlib.import_module(mod_path) - factory_fn = getattr(fmod, attr) - if not callable(factory_fn): - raise ConfigFileError(f"Factory '{factory_path}' is not callable") - - register_custom_transport( - name=transport_name, - transport_class=transport_class, - description=description, - config_schema=config_schema, - factory_function=factory_fn, - ) - - logger.info( - f"Loaded custom transport '{transport_name}' from " - f"{module_path}.{class_name}" - ) - - except MCPError: - raise - except Exception as e: - logger.error(f"Failed to load custom transport '{transport_name}': {e}") - raise ConfigFileError( - f"Failed to load custom transport '{transport_name}': {e}" - ) from e + """Convenience helper that uses the default loader to update global config.""" + + loader = ConfigLoader() + return loader.apply( + config_path=config_path, + search_paths=search_paths, + file_names=file_names, + ) diff --git a/mcp_fuzzer/config/parser.py b/mcp_fuzzer/config/parser.py new file mode 100644 index 0000000..a04b1ce --- /dev/null +++ b/mcp_fuzzer/config/parser.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +"""YAML configuration parsing utilities.""" + +from __future__ import annotations + +import os +from typing import Any + +import yaml + +from ..exceptions import ConfigFileError + + +def load_config_file(file_path: str) -> dict[str, Any]: + """Load configuration from a YAML file. + + Args: + file_path: Path to the configuration file + + Returns: + Dictionary containing the configuration + + Raises: + ConfigFileError: If the file cannot be found, parsed, or has permission issues + """ + if not os.path.isfile(file_path): + raise ConfigFileError(f"Configuration file not found: {file_path}") + + if not file_path.endswith((".yml", ".yaml")): + raise ConfigFileError( + f"Unsupported configuration file format: {file_path}. " + "Only YAML files with .yml or .yaml extensions are supported." + ) + + try: + with open(file_path, "r", encoding="utf-8") as f: + return yaml.safe_load(f) or {} + except yaml.YAMLError as e: + raise ConfigFileError( + f"Error parsing YAML configuration file {file_path}: {str(e)}" + ) + except PermissionError: + raise ConfigFileError( + f"Permission denied when reading configuration file: {file_path}" + ) + except Exception as e: + raise ConfigFileError( + f"Unexpected error reading configuration file {file_path}: {str(e)}" + ) diff --git a/mcp_fuzzer/config/schema.py b/mcp_fuzzer/config/schema.py new file mode 100644 index 0000000..fda83e1 --- /dev/null +++ b/mcp_fuzzer/config/schema.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 +"""Schema helpers for configuration validation.""" + +from __future__ import annotations + +from typing import Any + + +def get_config_schema() -> dict[str, Any]: + """Return the JSON schema describing the configuration structure.""" + return { + "type": "object", + "properties": { + "timeout": {"type": "number", "description": "Default timeout in seconds"}, + "tool_timeout": { + "type": "number", + "description": "Tool-specific timeout in seconds", + }, + "log_level": { + "type": "string", + "enum": ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + }, + "safety_enabled": { + "type": "boolean", + "description": "Whether safety features are enabled", + }, + "fs_root": { + "type": "string", + "description": "Root directory for file operations", + }, + "http_timeout": { + "type": "number", + "description": "HTTP transport timeout in seconds", + }, + "sse_timeout": { + "type": "number", + "description": "SSE transport timeout in seconds", + }, + "stdio_timeout": { + "type": "number", + "description": "STDIO transport timeout in seconds", + }, + "mode": {"type": "string", "enum": ["tools", "protocol", "both"]}, + "phase": {"type": "string", "enum": ["realistic", "aggressive", "both"]}, + "protocol": { + "type": "string", + "enum": ["http", "https", "sse", "stdio", "streamablehttp"], + }, + "endpoint": {"type": "string", "description": "Server endpoint URL"}, + "runs": {"type": "integer", "description": "Number of fuzzing runs"}, + "runs_per_type": { + "type": "integer", + "description": "Number of runs per protocol type", + }, + "protocol_type": { + "type": "string", + "description": "Specific protocol type to fuzz", + }, + "no_network": {"type": "boolean", "description": "Disable network access"}, + "allow_hosts": { + "type": "array", + "items": {"type": "string"}, + "description": "List of allowed hosts", + }, + "max_concurrency": { + "type": "integer", + "description": "Maximum concurrent operations", + }, + "auth": { + "type": "object", + "properties": { + "providers": { + "type": "array", + "items": { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": ["api_key", "basic", "oauth", "custom"], + }, + "id": {"type": "string"}, + "config": {"type": "object"}, + }, + "required": ["type", "id"], + }, + }, + "mappings": { + "type": "object", + "additionalProperties": {"type": "string"}, + }, + }, + }, + "custom_transports": { + "type": "object", + "description": "Configuration for custom transport mechanisms", + "patternProperties": { + "^[a-zA-Z][a-zA-Z0-9_]*$": { + "type": "object", + "properties": { + "module": { + "type": "string", + "description": "Python module containing transport", + }, + "class": { + "type": "string", + "description": "Transport class name", + }, + "description": { + "type": "string", + "description": "Human-readable description", + }, + "factory": { + "type": "string", + "description": "Dotted path to factory function " + "(e.g., pkg.mod.create_transport)", + }, + "config_schema": { + "type": "object", + "description": "JSON schema for transport config", + }, + }, + "additionalProperties": False, + "required": ["module", "class"], + } + }, + "additionalProperties": False, + }, + "safety": { + "type": "object", + "properties": { + "enabled": {"type": "boolean"}, + "local_hosts": {"type": "array", "items": {"type": "string"}}, + "no_network": {"type": "boolean"}, + "header_denylist": {"type": "array", "items": {"type": "string"}}, + "proxy_env_denylist": { + "type": "array", + "items": {"type": "string"}, + }, + "env_allowlist": {"type": "array", "items": {"type": "string"}}, + }, + }, + "output": { + "type": "object", + "properties": { + "format": { + "type": "string", + "enum": ["json", "yaml", "csv", "xml"], + "description": "Output format for standardized reports", + }, + "directory": { + "type": "string", + "description": "Directory to save output files", + }, + "compress": { + "type": "boolean", + "description": "Whether to compress output files", + }, + "types": { + "type": "array", + "items": { + "type": "string", + "enum": [ + "fuzzing_results", + "error_report", + "safety_summary", + "performance_metrics", + "configuration_dump", + ], + }, + "description": "Specific output types to generate", + }, + "schema": { + "type": "string", + "description": "Path to custom output schema file", + }, + "retention": { + "type": "object", + "properties": { + "days": { + "type": "integer", + "description": "Number of days to retain output files", + }, + "max_size": { + "type": "string", + "description": ( + "Maximum size of output directory " + "(e.g., '1GB', '500MB')" + ), + }, + }, + }, + }, + }, + }, + } diff --git a/mcp_fuzzer/config/transports.py b/mcp_fuzzer/config/transports.py new file mode 100644 index 0000000..af92289 --- /dev/null +++ b/mcp_fuzzer/config/transports.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +"""Custom transport registration helpers.""" + +from __future__ import annotations + +import importlib +import logging +from typing import Any + +from ..exceptions import ConfigFileError, MCPError +from ..transport.custom import register_custom_transport +from ..transport.base import TransportProtocol + +logger = logging.getLogger(__name__) + + +def load_custom_transports(config_data: dict[str, Any]) -> None: + """Load and register custom transports from configuration. + + Args: + config_data: Configuration dictionary containing custom_transports section + """ + custom_transports = config_data.get("custom_transports", {}) + + for transport_name, transport_config in custom_transports.items(): + try: + module_path = transport_config["module"] + class_name = transport_config["class"] + + module = importlib.import_module(module_path) + transport_class = getattr(module, class_name) + if not isinstance(transport_class, type): + raise ConfigFileError(f"{module_path}.{class_name} is not a class") + if not issubclass(transport_class, TransportProtocol): + raise ConfigFileError( + f"{module_path}.{class_name} must subclass TransportProtocol" + ) + + description = transport_config.get("description", "") + config_schema = transport_config.get("config_schema") + factory_fn = None + factory_path = transport_config.get("factory") + if factory_path: + try: + mod_path, attr = factory_path.rsplit(".", 1) + except ValueError as ve: + raise ConfigFileError( + f"Invalid factory path '{factory_path}'; expected 'module.attr'" + ) from ve + fmod = importlib.import_module(mod_path) + factory_fn = getattr(fmod, attr) + if not callable(factory_fn): + raise ConfigFileError(f"Factory '{factory_path}' is not callable") + + register_custom_transport( + name=transport_name, + transport_class=transport_class, + description=description, + config_schema=config_schema, + factory_function=factory_fn, + ) + + logger.info( + f"Loaded custom transport '{transport_name}' from " + f"{module_path}.{class_name}" + ) + + except MCPError: + raise + except Exception as e: + logger.error(f"Failed to load custom transport '{transport_name}': {e}") + raise ConfigFileError( + f"Failed to load custom transport '{transport_name}': {e}" + ) from e diff --git a/tests/unit/config/test_config_loader.py b/tests/unit/config/test_config_loader.py index 50f08df..5a992dc 100644 --- a/tests/unit/config/test_config_loader.py +++ b/tests/unit/config/test_config_loader.py @@ -1,203 +1,190 @@ #!/usr/bin/env python3 -"""Tests for the configuration loader module.""" +"""Unit tests that exercise the new config loader helpers.""" + +from __future__ import annotations import os -import tempfile +from pathlib import Path +from textwrap import dedent +from typing import Any + import pytest -from unittest.mock import patch, mock_open +from unittest.mock import Mock, patch from mcp_fuzzer.config import ( - find_config_file, - load_config_file, + ConfigLoader, apply_config_file, + find_config_file, get_config_schema, - config, + load_config_file, ) -from mcp_fuzzer.exceptions import ConfigFileError, ValidationError +from mcp_fuzzer.exceptions import ConfigFileError @pytest.fixture -def config_files(): - """Create temporary YAML files for testing.""" - # Create temporary directory - temp_dir = tempfile.TemporaryDirectory() - - # YAML content - yaml_content = """ -timeout: 60.0 -log_level: "DEBUG" -safety: - enabled: true - no_network: false - local_hosts: - - "localhost" - - "127.0.0.1" -""" - # Create .yml file - yml_path = os.path.join(temp_dir.name, "mcp-fuzzer.yml") - with open(yml_path, "w") as f: - f.write(yaml_content) - - # Create .yaml file - yaml_path = os.path.join(temp_dir.name, "mcp-fuzzer.yaml") - with open(yaml_path, "w") as f: - f.write(yaml_content) - - # Return paths - yield { - "temp_dir": temp_dir, - "yml_path": yml_path, - "yaml_path": yaml_path, +def config_files(tmp_path: Path) -> dict[str, Any]: + """Create a couple of temporary YAML config files for reuse.""" + content = dedent( + """ + timeout: 60.0 + log_level: "DEBUG" + safety: + enabled: true + no_network: false + local_hosts: + - "localhost" + """ + ).strip() + + yml_path = tmp_path / "mcp-fuzzer.yml" + yaml_path = tmp_path / "mcp-fuzzer.yaml" + yml_path.write_text(content) + yaml_path.write_text(content) + + return { + "temp_dir": tmp_path, + "yml_path": str(yml_path), + "yaml_path": str(yaml_path), } - # Cleanup - temp_dir.cleanup() - def test_find_config_file_explicit_path(config_files): - """Test finding a config file with an explicit path.""" - # Test with explicit path - found_path = find_config_file(config_path=config_files["yaml_path"]) - assert found_path == config_files["yaml_path"] - - # Test with non-existent path - found_path = find_config_file(config_path="/non/existent/path") - assert found_path is None + result = find_config_file(config_path=config_files["yaml_path"]) + assert result == config_files["yaml_path"] def test_find_config_file_search_paths(config_files): - """Test finding a config file in search paths.""" - # Test with search paths - found_path = find_config_file(search_paths=[config_files["temp_dir"].name]) - assert found_path in [config_files["yml_path"], config_files["yaml_path"]] + result = find_config_file(search_paths=[str(config_files["temp_dir"])]) + assert result in [config_files["yml_path"], config_files["yaml_path"]] - # Test with empty search paths - found_path = find_config_file(search_paths=["/non/existent/path"]) - assert found_path is None +def test_find_config_file_default_search(monkeypatch, tmp_path): + """Ensure the default search uses the current working directory.""" + config_path = tmp_path / "mcp-fuzzer.yml" + config_path.write_text("timeout: 5") -def test_load_config_file_yml(config_files): - """Test loading a .yml config file.""" - config_data = load_config_file(config_files["yml_path"]) - assert config_data["timeout"] == 60.0 - assert config_data["log_level"] == "DEBUG" - assert config_data["safety"]["enabled"] is True - assert config_data["safety"]["no_network"] is False - assert config_data["safety"]["local_hosts"] == ["localhost", "127.0.0.1"] + monkeypatch.chdir(tmp_path) + assert find_config_file() == str(config_path) + + +def test_find_config_file_missing(tmp_path): + result = find_config_file( + search_paths=[str(tmp_path)], + file_names=["nonexistent.yml"], + ) + assert result is None def test_load_config_file_yaml(config_files): - """Test loading a .yaml config file.""" - config_data = load_config_file(config_files["yaml_path"]) - assert config_data["timeout"] == 60.0 + config_data = load_config_file(config_files["yml_path"]) assert config_data["log_level"] == "DEBUG" + assert config_data["timeout"] == 60.0 assert config_data["safety"]["enabled"] is True - assert config_data["safety"]["no_network"] is False - assert config_data["safety"]["local_hosts"] == ["localhost", "127.0.0.1"] - -def test_load_config_file_invalid_format(config_files): - """Test loading a config file with an invalid format.""" - # Create a file with an invalid extension - invalid_path = os.path.join(config_files["temp_dir"].name, "invalid.txt") - with open(invalid_path, "w") as f: - f.write("invalid content") +def test_load_config_file_invalid_extension(tmp_path): + invalid = tmp_path / "config.txt" + invalid.write_text("timeout: 1") with pytest.raises(ConfigFileError): - load_config_file(invalid_path) + load_config_file(str(invalid)) def test_load_config_file_not_found(): - """Test loading a non-existent config file.""" with pytest.raises(ConfigFileError): - load_config_file("/non/existent/path") + load_config_file("/non/existent/path.yaml") -def test_load_config_file_invalid_yaml(config_files): - """Test loading an invalid YAML file.""" - invalid_yaml_path = os.path.join(config_files["temp_dir"].name, "invalid.yaml") - with open(invalid_yaml_path, "w") as f: - f.write("invalid: yaml: content:") - - with pytest.raises(ConfigFileError): - load_config_file(invalid_yaml_path) - - -def test_load_config_file_invalid_extension(config_files): - """Test loading a file with invalid extension.""" - invalid_ext_path = os.path.join(config_files["temp_dir"].name, "config.txt") - with open(invalid_ext_path, "w") as f: - f.write("valid: yaml") - +def test_load_config_file_invalid_yaml(tmp_path): + invalid = tmp_path / "broken.yaml" + invalid.write_text("timeout: [1,") with pytest.raises(ConfigFileError): - load_config_file(invalid_ext_path) + load_config_file(str(invalid)) +@patch("mcp_fuzzer.config.loader.load_custom_transports") @patch("mcp_fuzzer.config.loader.config") -def test_apply_config_file(mock_config, config_files): - """Test applying a config file.""" - # Test with explicit path +def test_apply_config_file_updates_global_state( + mock_config, mock_transports, config_files +): result = apply_config_file(config_path=config_files["yaml_path"]) assert result is True - mock_config.update.assert_called_once() + mock_transports.assert_called_once() + updated = mock_config.update.call_args[0][0] + assert updated.get("timeout") == 60.0 + assert mock_config.update.call_count == 1 - # Reset mock - mock_config.reset_mock() - - # Test with search paths - result = apply_config_file(search_paths=[config_files["temp_dir"].name]) - assert result is True - mock_config.update.assert_called_once() - - # Reset mock - mock_config.reset_mock() - - # Test with non-existent path - result = apply_config_file(config_path="/non/existent/path") - assert result is False - mock_config.update.assert_not_called() - - -def test_apply_config_file_handles_load_errors(config_files): - """apply_config_file should return False if load_config_file fails.""" - with patch( - "mcp_fuzzer.config.loader.config" - ) as mock_config, patch( - "mcp_fuzzer.config.loader.load_config_file" - ) as mock_load_config, patch( - "mcp_fuzzer.config.loader.load_custom_transports" - ) as mock_load_custom: - mock_load_config.side_effect = ConfigFileError("boom") - result = apply_config_file(config_path=config_files["yaml_path"]) - - assert result is False - mock_config.update.assert_not_called() - mock_load_custom.assert_not_called() - - -def test_apply_config_file_handles_custom_transport_errors(config_files): - """apply_config_file should return False if load_custom_transports fails.""" - with patch( - "mcp_fuzzer.config.loader.config" - ) as mock_config, patch( - "mcp_fuzzer.config.loader.load_config_file" - ) as mock_load_config, patch( - "mcp_fuzzer.config.loader.load_custom_transports" - ) as mock_load_custom: - mock_load_config.return_value = {"custom_transports": {}} - mock_load_custom.side_effect = ConfigFileError("bad transport") - result = apply_config_file(config_path=config_files["yaml_path"]) +@patch("mcp_fuzzer.config.loader.load_custom_transports") +@patch("mcp_fuzzer.config.loader.config") +def test_apply_config_file_returns_false_when_missing(mock_config, mock_transports): + result = apply_config_file(config_path="/nope.yaml") assert result is False mock_config.update.assert_not_called() + mock_transports.assert_not_called() -def test_get_config_schema(): - """Test getting the configuration schema.""" +def test_get_config_schema_includes_expected_fields(): schema = get_config_schema() - assert isinstance(schema, dict) - assert schema["type"] == "object" - assert "properties" in schema - assert "timeout" in schema["properties"] - assert "log_level" in schema["properties"] - assert "safety" in schema["properties"] + props = schema["properties"] + assert props["timeout"]["type"] == "number" + assert "custom_transports" in props + assert props["custom_transports"]["patternProperties"] + + +def test_config_loader_load_invokes_dependencies(): + parser = Mock(return_value={"timeout": 1}) + transport_loader = Mock() + loader = ConfigLoader( + discoverer=lambda *_args, **_kwargs: "config.yaml", + parser=parser, + transport_loader=transport_loader, + ) + data, path = loader.load() + parser.assert_called_once_with("config.yaml") + transport_loader.assert_called_once_with({"timeout": 1}) + assert data == {"timeout": 1} + assert path == "config.yaml" + + +def test_config_loader_load_returns_none_when_not_found(): + loader = ConfigLoader(discoverer=lambda *_: None) + data, path = loader.load() + assert data is None + assert path is None + + +def test_config_loader_apply_merges_data(): + parser = Mock(return_value={"log_level": "INFO"}) + loader = ConfigLoader( + discoverer=lambda *_: "config.yaml", + parser=parser, + transport_loader=Mock(), + ) + with patch("mcp_fuzzer.config.loader.config") as mock_config: + assert loader.apply() is True + mock_config.update.assert_called_once_with({"log_level": "INFO"}) + + +def test_config_loader_apply_handles_parser_errors(): + parser = Mock(side_effect=ConfigFileError("boom")) + loader = ConfigLoader( + discoverer=lambda *_: "config.yaml", + parser=parser, + transport_loader=Mock(), + ) + with patch("mcp_fuzzer.config.loader.config") as mock_config: + assert loader.apply() is False + mock_config.update.assert_not_called() + + +def test_config_loader_apply_handles_transport_errors(): + parser = Mock(return_value={"timeout": 5}) + transport_loader = Mock(side_effect=ConfigFileError("bad transport")) + loader = ConfigLoader( + discoverer=lambda *_: "config.yaml", + parser=parser, + transport_loader=transport_loader, + ) + with patch("mcp_fuzzer.config.loader.config") as mock_config: + assert loader.apply() is False + mock_config.update.assert_not_called() diff --git a/tests/unit/config/test_discovery_transports.py b/tests/unit/config/test_discovery_transports.py new file mode 100644 index 0000000..aede0a0 --- /dev/null +++ b/tests/unit/config/test_discovery_transports.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +"""Targeted tests for the config discovery and transport helpers.""" + +from __future__ import annotations + +import os +import tempfile + +import pytest + +from mcp_fuzzer.config.discovery import find_config_file +from mcp_fuzzer.config.transports import load_custom_transports +from mcp_fuzzer.exceptions import ConfigFileError +from mcp_fuzzer.transport.base import TransportProtocol +from mcp_fuzzer.transport.custom import list_custom_transports, registry + + +class DummyTransport(TransportProtocol): + """Simple transport stub used for registration tests.""" + + async def send_request(self, method: str, params=None): + return {"jsonrpc": "2.0", "result": {}, "id": 1} + + async def send_raw(self, payload): + return {"result": "ok"} + + async def send_notification(self, method: str, params=None): + return None + + async def _stream_request(self, payload): + yield {"jsonrpc": "2.0", "result": payload} + + +class NonTransport: + """Helper class that does not inherit TransportProtocol.""" + + +@pytest.fixture(autouse=True) +def clear_registry(): + """Always clear custom transport registry before and after each test.""" + registry.clear() + yield + registry.clear() + + +def test_find_config_file_prefers_explicit_path(tmp_path): + """Explicit config_path should be returned even if other files exist.""" + path = tmp_path / "mcp-fuzzer.yaml" + path.write_text("timeout: 5") + assert find_config_file(config_path=str(path), search_paths=[str(tmp_path)]) == str(path) + + +def test_find_config_file_search_paths(tmp_path): + """Search paths should be honored when they contain a config file.""" + path = tmp_path / "mcp-fuzzer.yml" + path.write_text("timeout: 10\n") + result = find_config_file(search_paths=[str(tmp_path)], file_names=["mcp-fuzzer.yml"]) + assert result == str(path) + + +def test_find_config_file_returns_none_when_missing(tmp_path): + """Return None when no configuration file exists in the requested paths.""" + assert find_config_file(search_paths=[str(tmp_path)]) is None + + +def test_load_custom_transports_registers_transport(): + """Valid transport entry should be registered in the custom registry.""" + config_data = { + "custom_transports": { + "dummy": { + "module": __name__, + "class": "DummyTransport", + "description": "Unit test transport", + } + } + } + + load_custom_transports(config_data) + transports = list_custom_transports() + assert "dummy" in transports + + +def test_load_custom_transports_missing_module_raises(): + """Non-existent module should raise ConfigFileError.""" + with pytest.raises(ConfigFileError): + load_custom_transports( + { + "custom_transports": { + "missing": {"module": "no.such.module", "class": "FooTransport"} + } + } + ) + + +def test_load_custom_transports_invalid_class_raises(): + """Classes that do not inherit TransportProtocol should fail validation.""" + with pytest.raises(ConfigFileError): + load_custom_transports( + { + "custom_transports": { + "invalid": {"module": __name__, "class": "NonTransport"} + } + } + ) From 2380c37d3b54cee4d70377b6058fd4137b0bff68 Mon Sep 17 00:00:00 2001 From: Prince Roshan Date: Fri, 28 Nov 2025 03:05:14 +0530 Subject: [PATCH 2/3] redesign --- .gitignore | 1 + mcp_fuzzer/cli/config_merge.py | 12 +- mcp_fuzzer/cli/startup_info.py | 4 +- mcp_fuzzer/cli/validators.py | 4 +- mcp_fuzzer/client/__init__.py | 67 ++++- mcp_fuzzer/client/adapters/__init__.py | 11 + mcp_fuzzer/client/adapters/config_adapter.py | 113 +++++++++ mcp_fuzzer/client/constants.py | 135 ++++++++++ mcp_fuzzer/client/ports/__init__.py | 11 + mcp_fuzzer/client/ports/config_port.py | 96 +++++++ mcp_fuzzer/client/tool_client.py | 3 +- mcp_fuzzer/config/__init__.py | 65 +++-- mcp_fuzzer/config/core/__init__.py | 68 +++++ mcp_fuzzer/config/{ => core}/constants.py | 2 +- mcp_fuzzer/config/core/manager.py | 83 +++++++ mcp_fuzzer/config/extensions/__init__.py | 7 + .../config/{ => extensions}/transports.py | 27 +- mcp_fuzzer/config/loader.py | 92 ------- mcp_fuzzer/config/loading/__init__.py | 17 ++ mcp_fuzzer/config/{ => loading}/discovery.py | 37 ++- mcp_fuzzer/config/loading/loader.py | 158 ++++++++++++ mcp_fuzzer/config/{ => loading}/parser.py | 6 +- mcp_fuzzer/config/loading/search_params.py | 25 ++ mcp_fuzzer/config/manager.py | 52 ---- mcp_fuzzer/config/schema.py | 195 --------------- mcp_fuzzer/config/schema/__init__.py | 7 + mcp_fuzzer/config/schema/builders.py | 235 ++++++++++++++++++ mcp_fuzzer/config/schema/composer.py | 43 ++++ mcp_fuzzer/fuzz_engine/runtime/watchdog.py | 7 +- .../realistic/protocol_type_strategy.py | 3 +- mcp_fuzzer/reports/reporter/__init__.py | 6 +- mcp_fuzzer/safety_system/policy.py | 4 +- mcp_fuzzer/transport/http.py | 3 +- mcp_fuzzer/transport/stdio.py | 3 +- mcp_fuzzer/transport/streamable_http.py | 3 +- tests/integration/test_custom_transport.py | 5 +- tests/integration/test_standardized_output.py | 4 +- tests/unit/cli/test_cli.py | 8 +- tests/unit/client/test_config_adapter.py | 131 ++++++++++ tests/unit/config/test_config_loader.py | 96 ++++++- tests/unit/config/test_constants.py | 116 +++++++++ .../unit/config/test_discovery_transports.py | 88 ++++++- tests/unit/config/test_manager.py | 118 +++++++++ tests/unit/config/test_parser.py | 92 +++++++ tests/unit/config/test_schema_builders.py | 149 +++++++++++ tests/unit/config/test_search_params.py | 63 +++++ 46 files changed, 2048 insertions(+), 427 deletions(-) create mode 100644 mcp_fuzzer/client/adapters/__init__.py create mode 100644 mcp_fuzzer/client/adapters/config_adapter.py create mode 100644 mcp_fuzzer/client/constants.py create mode 100644 mcp_fuzzer/client/ports/__init__.py create mode 100644 mcp_fuzzer/client/ports/config_port.py create mode 100644 mcp_fuzzer/config/core/__init__.py rename mcp_fuzzer/config/{ => core}/constants.py (96%) create mode 100644 mcp_fuzzer/config/core/manager.py create mode 100644 mcp_fuzzer/config/extensions/__init__.py rename mcp_fuzzer/config/{ => extensions}/transports.py (75%) delete mode 100644 mcp_fuzzer/config/loader.py create mode 100644 mcp_fuzzer/config/loading/__init__.py rename mcp_fuzzer/config/{ => loading}/discovery.py (53%) create mode 100644 mcp_fuzzer/config/loading/loader.py rename mcp_fuzzer/config/{ => loading}/parser.py (95%) create mode 100644 mcp_fuzzer/config/loading/search_params.py delete mode 100644 mcp_fuzzer/config/manager.py delete mode 100644 mcp_fuzzer/config/schema.py create mode 100644 mcp_fuzzer/config/schema/__init__.py create mode 100644 mcp_fuzzer/config/schema/builders.py create mode 100644 mcp_fuzzer/config/schema/composer.py create mode 100644 tests/unit/client/test_config_adapter.py create mode 100644 tests/unit/config/test_constants.py create mode 100644 tests/unit/config/test_manager.py create mode 100644 tests/unit/config/test_parser.py create mode 100644 tests/unit/config/test_schema_builders.py create mode 100644 tests/unit/config/test_search_params.py diff --git a/.gitignore b/.gitignore index 78ec16a..c68d2ed 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,7 @@ doc_ref/ # C extensions *REDESIGN*.md *FLOW.md +*USAGE.md *.so notes.md /reports/ diff --git a/mcp_fuzzer/cli/config_merge.py b/mcp_fuzzer/cli/config_merge.py index a0abf82..3589f65 100644 --- a/mcp_fuzzer/cli/config_merge.py +++ b/mcp_fuzzer/cli/config_merge.py @@ -7,8 +7,8 @@ import logging from typing import Any -from ..config import config as global_config, load_config_file, apply_config_file from ..exceptions import ConfigFileError +from ..client.adapters import config_mediator from ..client.settings import CliConfig from ..client.transport.auth_port import resolve_auth_port from .parser import create_argument_parser @@ -60,7 +60,7 @@ def _transfer_config_to_args(args: argparse.Namespace) -> None: ] for config_key, args_key in mapping: - config_value = global_config.get(config_key) + config_value = config_mediator.get(config_key) default_value = defaults_parser.get_default(args_key) if default_value is argparse.SUPPRESS: # pragma: no cover default_value = None @@ -73,15 +73,15 @@ def build_cli_config(args: argparse.Namespace) -> CliConfig: """Merge CLI args, config files, and resolved auth.""" if args.config: try: - loaded = load_config_file(args.config) - global_config.update(loaded) + loaded = config_mediator.load_file(args.config) + config_mediator.update(loaded) except Exception as exc: raise ConfigFileError( f"Failed to load configuration file '{args.config}': {exc}" ) else: try: - apply_config_file() + config_mediator.apply_file() except Exception as exc: logging.debug(f"Error loading default configuration file: {exc}") @@ -135,7 +135,7 @@ def build_cli_config(args: argparse.Namespace) -> CliConfig: "auth_manager": auth_manager, } - global_config.update(merged) + config_mediator.update(merged) return CliConfig(args=args, merged=merged) diff --git a/mcp_fuzzer/cli/startup_info.py b/mcp_fuzzer/cli/startup_info.py index 03a6550..c513817 100644 --- a/mcp_fuzzer/cli/startup_info.py +++ b/mcp_fuzzer/cli/startup_info.py @@ -26,8 +26,8 @@ def print_startup_info(args: argparse.Namespace, config: dict | None = None) -> try: # Load and display the config file content import json - from ..config import load_config_file - raw_config = load_config_file(args.config) + from ..client.adapters import config_mediator + raw_config = config_mediator.load_file(args.config) config_json = json.dumps(raw_config, indent=2, sort_keys=True) console.print(f"[dim]{config_json}[/dim]") console.print() diff --git a/mcp_fuzzer/cli/validators.py b/mcp_fuzzer/cli/validators.py index e581a4f..3a1d921 100644 --- a/mcp_fuzzer/cli/validators.py +++ b/mcp_fuzzer/cli/validators.py @@ -11,7 +11,7 @@ from rich.console import Console from ..exceptions import ArgumentValidationError -from ..config import load_config_file +from ..client.adapters import config_mediator from ..transport.factory import create_transport from ..exceptions import MCPError, TransportError from ..env import ENVIRONMENT_VARIABLES, ValidationType @@ -65,7 +65,7 @@ def validate_arguments(self, args: argparse.Namespace) -> None: def validate_config_file(self, path: str) -> None: """Validate a config file and print success message.""" - load_config_file(path) + config_mediator.load_file(path) success_msg = ( "[green]:heavy_check_mark: Configuration file " f"'{path}' is valid[/green]" diff --git a/mcp_fuzzer/client/__init__.py b/mcp_fuzzer/client/__init__.py index d4b9305..f50701b 100644 --- a/mcp_fuzzer/client/__init__.py +++ b/mcp_fuzzer/client/__init__.py @@ -1,7 +1,72 @@ """Public client exports.""" from .base import MCPFuzzerClient +from .adapters import ConfigAdapter, config_mediator +from .constants import ( + CONTENT_TYPE_HEADER, + DEFAULT_FORCE_KILL_TIMEOUT, + DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, + DEFAULT_HTTP_ACCEPT, + DEFAULT_MAX_TOTAL_FUZZING_TIME, + DEFAULT_MAX_TOOL_TIME, + DEFAULT_PROTOCOL_RUNS_PER_TYPE, + DEFAULT_PROTOCOL_VERSION, + DEFAULT_TIMEOUT, + DEFAULT_TOOL_RUNS, + DEFAULT_TOOL_TIMEOUT, + JSON_CONTENT_TYPE, + MCP_PROTOCOL_VERSION_HEADER, + MCP_SESSION_ID_HEADER, + PROCESS_CLEANUP_TIMEOUT, + PROCESS_FORCE_KILL_TIMEOUT, + PROCESS_TERMINATION_TIMEOUT, + PROCESS_WAIT_TIMEOUT, + SAFETY_ENV_ALLOWLIST, + SAFETY_HEADER_DENYLIST, + SAFETY_LOCAL_HOSTS, + SAFETY_NO_NETWORK_DEFAULT, + SAFETY_PROXY_ENV_DENYLIST, + SSE_CONTENT_TYPE, + WATCHDOG_DEFAULT_CHECK_INTERVAL, + WATCHDOG_EXTRA_BUFFER, + WATCHDOG_MAX_HANG_ADDITIONAL, +) +from .ports import ConfigPort UnifiedMCPFuzzerClient = MCPFuzzerClient -__all__ = ["MCPFuzzerClient", "UnifiedMCPFuzzerClient"] +__all__ = [ + "MCPFuzzerClient", + "UnifiedMCPFuzzerClient", + "ConfigPort", + "ConfigAdapter", + "config_mediator", + # Constants + "DEFAULT_PROTOCOL_VERSION", + "CONTENT_TYPE_HEADER", + "JSON_CONTENT_TYPE", + "SSE_CONTENT_TYPE", + "DEFAULT_HTTP_ACCEPT", + "MCP_SESSION_ID_HEADER", + "MCP_PROTOCOL_VERSION_HEADER", + "WATCHDOG_DEFAULT_CHECK_INTERVAL", + "WATCHDOG_EXTRA_BUFFER", + "WATCHDOG_MAX_HANG_ADDITIONAL", + "SAFETY_LOCAL_HOSTS", + "SAFETY_NO_NETWORK_DEFAULT", + "SAFETY_HEADER_DENYLIST", + "SAFETY_PROXY_ENV_DENYLIST", + "SAFETY_ENV_ALLOWLIST", + "DEFAULT_TOOL_RUNS", + "DEFAULT_PROTOCOL_RUNS_PER_TYPE", + "DEFAULT_TIMEOUT", + "DEFAULT_TOOL_TIMEOUT", + "DEFAULT_MAX_TOOL_TIME", + "DEFAULT_MAX_TOTAL_FUZZING_TIME", + "DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT", + "DEFAULT_FORCE_KILL_TIMEOUT", + "PROCESS_TERMINATION_TIMEOUT", + "PROCESS_FORCE_KILL_TIMEOUT", + "PROCESS_CLEANUP_TIMEOUT", + "PROCESS_WAIT_TIMEOUT", +] diff --git a/mcp_fuzzer/client/adapters/__init__.py b/mcp_fuzzer/client/adapters/__init__.py new file mode 100644 index 0000000..6256e32 --- /dev/null +++ b/mcp_fuzzer/client/adapters/__init__.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 +"""Adapter implementations for Port and Adapter pattern. + +Adapters implement the ports (interfaces) by adapting external modules. +This is where the mediation between modules happens. +""" + +from .config_adapter import ConfigAdapter, config_mediator + +__all__ = ["ConfigAdapter", "config_mediator"] + diff --git a/mcp_fuzzer/client/adapters/config_adapter.py b/mcp_fuzzer/client/adapters/config_adapter.py new file mode 100644 index 0000000..cba8f83 --- /dev/null +++ b/mcp_fuzzer/client/adapters/config_adapter.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +"""Configuration adapter implementation for Port and Adapter pattern. + +This module implements the ConfigPort interface by adapting the config module. +This is the adapter that mediates all configuration access. +""" + +from __future__ import annotations + +from typing import Any + +from ...config import ( + apply_config_file, + get_config_schema, + load_config_file, +) +from ...config.core.manager import config as global_config +from ..ports.config_port import ConfigPort + + +class ConfigAdapter(ConfigPort): + """Adapter that implements ConfigPort by delegating to the config module. + + This adapter acts as a mediator between other modules and the config module, + implementing the Port and Adapter (Hexagonal Architecture) pattern. + """ + + def __init__(self, config_instance: Any = None): + """Initialize the config adapter. + + Args: + config_instance: Optional configuration instance to use. + If None, uses the global config instance. + """ + self._config = config_instance or global_config + + def get(self, key: str, default: Any = None) -> Any: + """Get a configuration value by key. + + Args: + key: Configuration key + default: Default value if key not found + + Returns: + Configuration value or default + """ + return self._config.get(key, default) + + def set(self, key: str, value: Any) -> None: + """Set a configuration value. + + Args: + key: Configuration key + value: Configuration value + """ + self._config.set(key, value) + + def update(self, config_dict: dict[str, Any]) -> None: + """Update configuration with values from a dictionary. + + Args: + config_dict: Dictionary of configuration values to update + """ + self._config.update(config_dict) + + def load_file(self, file_path: str) -> dict[str, Any]: + """Load configuration from a file. + + Args: + file_path: Path to configuration file + + Returns: + Dictionary containing loaded configuration + + Raises: + ConfigFileError: If file cannot be loaded + """ + return load_config_file(file_path) + + def apply_file( + self, + config_path: str | None = None, + search_paths: list[str] | None = None, + file_names: list[str] | None = None, + ) -> bool: + """Load and apply configuration from a file. + + Args: + config_path: Explicit path to config file + search_paths: List of directories to search + file_names: List of file names to search for + + Returns: + True if configuration was loaded and applied, False otherwise + """ + return apply_config_file( + config_path=config_path, + search_paths=search_paths, + file_names=file_names, + ) + + def get_schema(self) -> dict[str, Any]: + """Get the JSON schema for configuration validation. + + Returns: + JSON schema dictionary + """ + return get_config_schema() + + +# Global instance for convenience (acts as the mediator) +config_mediator: ConfigPort = ConfigAdapter() + diff --git a/mcp_fuzzer/client/constants.py b/mcp_fuzzer/client/constants.py new file mode 100644 index 0000000..4e08737 --- /dev/null +++ b/mcp_fuzzer/client/constants.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +"""Configuration constants exposed through client module. + +This module re-exports all configuration constants so other modules +can access them through the client mediator instead of directly +from the config module. + +Uses lazy imports to avoid circular dependencies. +""" + +# Lazy import to avoid circular dependencies +# Import happens at module level but after client module is initialized +def _get_constants(): + """Lazy import of constants to break circular dependencies.""" + from mcp_fuzzer.config.core.constants import ( + CONTENT_TYPE_HEADER, + DEFAULT_FORCE_KILL_TIMEOUT, + DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, + DEFAULT_HTTP_ACCEPT, + DEFAULT_MAX_TOTAL_FUZZING_TIME, + DEFAULT_MAX_TOOL_TIME, + DEFAULT_PROTOCOL_RUNS_PER_TYPE, + DEFAULT_PROTOCOL_VERSION, + DEFAULT_TIMEOUT, + DEFAULT_TOOL_RUNS, + DEFAULT_TOOL_TIMEOUT, + JSON_CONTENT_TYPE, + MCP_PROTOCOL_VERSION_HEADER, + MCP_SESSION_ID_HEADER, + PROCESS_CLEANUP_TIMEOUT, + PROCESS_FORCE_KILL_TIMEOUT, + PROCESS_TERMINATION_TIMEOUT, + PROCESS_WAIT_TIMEOUT, + SAFETY_ENV_ALLOWLIST, + SAFETY_HEADER_DENYLIST, + SAFETY_LOCAL_HOSTS, + SAFETY_NO_NETWORK_DEFAULT, + SAFETY_PROXY_ENV_DENYLIST, + SSE_CONTENT_TYPE, + WATCHDOG_DEFAULT_CHECK_INTERVAL, + WATCHDOG_EXTRA_BUFFER, + WATCHDOG_MAX_HANG_ADDITIONAL, + ) + return { + "CONTENT_TYPE_HEADER": CONTENT_TYPE_HEADER, + "DEFAULT_FORCE_KILL_TIMEOUT": DEFAULT_FORCE_KILL_TIMEOUT, + "DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT": DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, + "DEFAULT_HTTP_ACCEPT": DEFAULT_HTTP_ACCEPT, + "DEFAULT_MAX_TOTAL_FUZZING_TIME": DEFAULT_MAX_TOTAL_FUZZING_TIME, + "DEFAULT_MAX_TOOL_TIME": DEFAULT_MAX_TOOL_TIME, + "DEFAULT_PROTOCOL_RUNS_PER_TYPE": DEFAULT_PROTOCOL_RUNS_PER_TYPE, + "DEFAULT_PROTOCOL_VERSION": DEFAULT_PROTOCOL_VERSION, + "DEFAULT_TIMEOUT": DEFAULT_TIMEOUT, + "DEFAULT_TOOL_RUNS": DEFAULT_TOOL_RUNS, + "DEFAULT_TOOL_TIMEOUT": DEFAULT_TOOL_TIMEOUT, + "JSON_CONTENT_TYPE": JSON_CONTENT_TYPE, + "MCP_PROTOCOL_VERSION_HEADER": MCP_PROTOCOL_VERSION_HEADER, + "MCP_SESSION_ID_HEADER": MCP_SESSION_ID_HEADER, + "PROCESS_CLEANUP_TIMEOUT": PROCESS_CLEANUP_TIMEOUT, + "PROCESS_FORCE_KILL_TIMEOUT": PROCESS_FORCE_KILL_TIMEOUT, + "PROCESS_TERMINATION_TIMEOUT": PROCESS_TERMINATION_TIMEOUT, + "PROCESS_WAIT_TIMEOUT": PROCESS_WAIT_TIMEOUT, + "SAFETY_ENV_ALLOWLIST": SAFETY_ENV_ALLOWLIST, + "SAFETY_HEADER_DENYLIST": SAFETY_HEADER_DENYLIST, + "SAFETY_LOCAL_HOSTS": SAFETY_LOCAL_HOSTS, + "SAFETY_NO_NETWORK_DEFAULT": SAFETY_NO_NETWORK_DEFAULT, + "SAFETY_PROXY_ENV_DENYLIST": SAFETY_PROXY_ENV_DENYLIST, + "SSE_CONTENT_TYPE": SSE_CONTENT_TYPE, + "WATCHDOG_DEFAULT_CHECK_INTERVAL": WATCHDOG_DEFAULT_CHECK_INTERVAL, + "WATCHDOG_EXTRA_BUFFER": WATCHDOG_EXTRA_BUFFER, + "WATCHDOG_MAX_HANG_ADDITIONAL": WATCHDOG_MAX_HANG_ADDITIONAL, + } + +# Import constants lazily +_constants = _get_constants() + +# Export all constants +CONTENT_TYPE_HEADER = _constants["CONTENT_TYPE_HEADER"] +DEFAULT_FORCE_KILL_TIMEOUT = _constants["DEFAULT_FORCE_KILL_TIMEOUT"] +DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = _constants["DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT"] +DEFAULT_HTTP_ACCEPT = _constants["DEFAULT_HTTP_ACCEPT"] +DEFAULT_MAX_TOTAL_FUZZING_TIME = _constants["DEFAULT_MAX_TOTAL_FUZZING_TIME"] +DEFAULT_MAX_TOOL_TIME = _constants["DEFAULT_MAX_TOOL_TIME"] +DEFAULT_PROTOCOL_RUNS_PER_TYPE = _constants["DEFAULT_PROTOCOL_RUNS_PER_TYPE"] +DEFAULT_PROTOCOL_VERSION = _constants["DEFAULT_PROTOCOL_VERSION"] +DEFAULT_TIMEOUT = _constants["DEFAULT_TIMEOUT"] +DEFAULT_TOOL_RUNS = _constants["DEFAULT_TOOL_RUNS"] +DEFAULT_TOOL_TIMEOUT = _constants["DEFAULT_TOOL_TIMEOUT"] +JSON_CONTENT_TYPE = _constants["JSON_CONTENT_TYPE"] +MCP_PROTOCOL_VERSION_HEADER = _constants["MCP_PROTOCOL_VERSION_HEADER"] +MCP_SESSION_ID_HEADER = _constants["MCP_SESSION_ID_HEADER"] +PROCESS_CLEANUP_TIMEOUT = _constants["PROCESS_CLEANUP_TIMEOUT"] +PROCESS_FORCE_KILL_TIMEOUT = _constants["PROCESS_FORCE_KILL_TIMEOUT"] +PROCESS_TERMINATION_TIMEOUT = _constants["PROCESS_TERMINATION_TIMEOUT"] +PROCESS_WAIT_TIMEOUT = _constants["PROCESS_WAIT_TIMEOUT"] +SAFETY_ENV_ALLOWLIST = _constants["SAFETY_ENV_ALLOWLIST"] +SAFETY_HEADER_DENYLIST = _constants["SAFETY_HEADER_DENYLIST"] +SAFETY_LOCAL_HOSTS = _constants["SAFETY_LOCAL_HOSTS"] +SAFETY_NO_NETWORK_DEFAULT = _constants["SAFETY_NO_NETWORK_DEFAULT"] +SAFETY_PROXY_ENV_DENYLIST = _constants["SAFETY_PROXY_ENV_DENYLIST"] +SSE_CONTENT_TYPE = _constants["SSE_CONTENT_TYPE"] +WATCHDOG_DEFAULT_CHECK_INTERVAL = _constants["WATCHDOG_DEFAULT_CHECK_INTERVAL"] +WATCHDOG_EXTRA_BUFFER = _constants["WATCHDOG_EXTRA_BUFFER"] +WATCHDOG_MAX_HANG_ADDITIONAL = _constants["WATCHDOG_MAX_HANG_ADDITIONAL"] + +__all__ = [ + "DEFAULT_PROTOCOL_VERSION", + "CONTENT_TYPE_HEADER", + "JSON_CONTENT_TYPE", + "SSE_CONTENT_TYPE", + "DEFAULT_HTTP_ACCEPT", + "MCP_SESSION_ID_HEADER", + "MCP_PROTOCOL_VERSION_HEADER", + "WATCHDOG_DEFAULT_CHECK_INTERVAL", + "WATCHDOG_EXTRA_BUFFER", + "WATCHDOG_MAX_HANG_ADDITIONAL", + "SAFETY_LOCAL_HOSTS", + "SAFETY_NO_NETWORK_DEFAULT", + "SAFETY_HEADER_DENYLIST", + "SAFETY_PROXY_ENV_DENYLIST", + "SAFETY_ENV_ALLOWLIST", + "DEFAULT_TOOL_RUNS", + "DEFAULT_PROTOCOL_RUNS_PER_TYPE", + "DEFAULT_TIMEOUT", + "DEFAULT_TOOL_TIMEOUT", + "DEFAULT_MAX_TOOL_TIME", + "DEFAULT_MAX_TOTAL_FUZZING_TIME", + "DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT", + "DEFAULT_FORCE_KILL_TIMEOUT", + "PROCESS_TERMINATION_TIMEOUT", + "PROCESS_FORCE_KILL_TIMEOUT", + "PROCESS_CLEANUP_TIMEOUT", + "PROCESS_WAIT_TIMEOUT", +] + diff --git a/mcp_fuzzer/client/ports/__init__.py b/mcp_fuzzer/client/ports/__init__.py new file mode 100644 index 0000000..c1684fc --- /dev/null +++ b/mcp_fuzzer/client/ports/__init__.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 +"""Port interfaces for Port and Adapter pattern. + +Ports define the contracts (interfaces) that adapters must implement. +All modules should depend on ports, not concrete implementations. +""" + +from .config_port import ConfigPort + +__all__ = ["ConfigPort"] + diff --git a/mcp_fuzzer/client/ports/config_port.py b/mcp_fuzzer/client/ports/config_port.py new file mode 100644 index 0000000..53d5f5b --- /dev/null +++ b/mcp_fuzzer/client/ports/config_port.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +"""Configuration port interface for Port and Adapter pattern. + +This module defines the port (interface) for configuration access. +All modules should interact with configuration through this port, +not directly with the config module. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any + + +class ConfigPort(ABC): + """Port interface for configuration access. + + This defines the contract that all configuration adapters must implement. + Modules should depend on this interface, not concrete implementations. + """ + + @abstractmethod + def get(self, key: str, default: Any = None) -> Any: + """Get a configuration value by key. + + Args: + key: Configuration key + default: Default value if key not found + + Returns: + Configuration value or default + """ + pass + + @abstractmethod + def set(self, key: str, value: Any) -> None: + """Set a configuration value. + + Args: + key: Configuration key + value: Configuration value + """ + pass + + @abstractmethod + def update(self, config_dict: dict[str, Any]) -> None: + """Update configuration with values from a dictionary. + + Args: + config_dict: Dictionary of configuration values to update + """ + pass + + @abstractmethod + def load_file(self, file_path: str) -> dict[str, Any]: + """Load configuration from a file. + + Args: + file_path: Path to configuration file + + Returns: + Dictionary containing loaded configuration + + Raises: + ConfigFileError: If file cannot be loaded + """ + pass + + @abstractmethod + def apply_file( + self, + config_path: str | None = None, + search_paths: list[str] | None = None, + file_names: list[str] | None = None, + ) -> bool: + """Load and apply configuration from a file. + + Args: + config_path: Explicit path to config file + search_paths: List of directories to search + file_names: List of file names to search for + + Returns: + True if configuration was loaded and applied, False otherwise + """ + pass + + @abstractmethod + def get_schema(self) -> dict[str, Any]: + """Get the JSON schema for configuration validation. + + Returns: + JSON schema dictionary + """ + pass + diff --git a/mcp_fuzzer/client/tool_client.py b/mcp_fuzzer/client/tool_client.py index 9dd6786..eae45a3 100644 --- a/mcp_fuzzer/client/tool_client.py +++ b/mcp_fuzzer/client/tool_client.py @@ -12,7 +12,8 @@ from ..auth import AuthManager from ..fuzz_engine.fuzzer import ToolFuzzer from ..safety_system.safety import SafetyFilter, SafetyProvider -from ..config import ( +# Import constants directly from config (constants are values, not behavior) +from ..config.core.constants import ( DEFAULT_TOOL_RUNS, DEFAULT_MAX_TOOL_TIME, DEFAULT_MAX_TOTAL_FUZZING_TIME, diff --git a/mcp_fuzzer/config/__init__.py b/mcp_fuzzer/config/__init__.py index b83f404..1ad00b8 100644 --- a/mcp_fuzzer/config/__init__.py +++ b/mcp_fuzzer/config/__init__.py @@ -1,46 +1,53 @@ #!/usr/bin/env python3 """Configuration module for MCP Fuzzer.""" -# Import all constants explicitly -from .constants import ( - DEFAULT_PROTOCOL_VERSION, +# Import all constants and core functionality +from .core import ( CONTENT_TYPE_HEADER, - JSON_CONTENT_TYPE, - SSE_CONTENT_TYPE, + DEFAULT_FORCE_KILL_TIMEOUT, + DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_HTTP_ACCEPT, - MCP_SESSION_ID_HEADER, + DEFAULT_MAX_TOTAL_FUZZING_TIME, + DEFAULT_MAX_TOOL_TIME, + DEFAULT_PROTOCOL_RUNS_PER_TYPE, + DEFAULT_PROTOCOL_VERSION, + DEFAULT_TIMEOUT, + DEFAULT_TOOL_RUNS, + DEFAULT_TOOL_TIMEOUT, + JSON_CONTENT_TYPE, MCP_PROTOCOL_VERSION_HEADER, - WATCHDOG_DEFAULT_CHECK_INTERVAL, - WATCHDOG_EXTRA_BUFFER, - WATCHDOG_MAX_HANG_ADDITIONAL, + MCP_SESSION_ID_HEADER, + PROCESS_CLEANUP_TIMEOUT, + PROCESS_FORCE_KILL_TIMEOUT, + PROCESS_TERMINATION_TIMEOUT, + PROCESS_WAIT_TIMEOUT, + SAFETY_ENV_ALLOWLIST, + SAFETY_HEADER_DENYLIST, SAFETY_LOCAL_HOSTS, SAFETY_NO_NETWORK_DEFAULT, - SAFETY_HEADER_DENYLIST, SAFETY_PROXY_ENV_DENYLIST, - SAFETY_ENV_ALLOWLIST, - DEFAULT_TOOL_RUNS, - DEFAULT_PROTOCOL_RUNS_PER_TYPE, - DEFAULT_TIMEOUT, - DEFAULT_TOOL_TIMEOUT, - DEFAULT_MAX_TOOL_TIME, - DEFAULT_MAX_TOTAL_FUZZING_TIME, - DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, - DEFAULT_FORCE_KILL_TIMEOUT, + SSE_CONTENT_TYPE, + WATCHDOG_DEFAULT_CHECK_INTERVAL, + WATCHDOG_EXTRA_BUFFER, + WATCHDOG_MAX_HANG_ADDITIONAL, + config, ) -# Import configuration manager and global instance -from .manager import config - # Import loader functions -from .loader import ( +from .loading import ( ConfigLoader, + ConfigSearchParams, + apply_config_file, find_config_file, load_config_file, - apply_config_file, - get_config_schema, - load_custom_transports, ) +# Import schema +from .schema import get_config_schema + +# Import extensions +from .extensions import load_custom_transports + __all__ = [ # Constants "DEFAULT_PROTOCOL_VERSION", @@ -66,6 +73,10 @@ "DEFAULT_MAX_TOTAL_FUZZING_TIME", "DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT", "DEFAULT_FORCE_KILL_TIMEOUT", + "PROCESS_TERMINATION_TIMEOUT", + "PROCESS_FORCE_KILL_TIMEOUT", + "PROCESS_CLEANUP_TIMEOUT", + "PROCESS_WAIT_TIMEOUT", # Manager "config", # Loader helpers @@ -75,4 +86,6 @@ "apply_config_file", "get_config_schema", "load_custom_transports", + # Search parameters + "ConfigSearchParams", ] diff --git a/mcp_fuzzer/config/core/__init__.py b/mcp_fuzzer/config/core/__init__.py new file mode 100644 index 0000000..b2703a4 --- /dev/null +++ b/mcp_fuzzer/config/core/__init__.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +"""Core configuration management.""" + +from .constants import ( + CONTENT_TYPE_HEADER, + DEFAULT_FORCE_KILL_TIMEOUT, + DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, + DEFAULT_HTTP_ACCEPT, + DEFAULT_MAX_TOTAL_FUZZING_TIME, + DEFAULT_MAX_TOOL_TIME, + DEFAULT_PROTOCOL_RUNS_PER_TYPE, + DEFAULT_PROTOCOL_VERSION, + DEFAULT_TIMEOUT, + DEFAULT_TOOL_RUNS, + DEFAULT_TOOL_TIMEOUT, + JSON_CONTENT_TYPE, + MCP_PROTOCOL_VERSION_HEADER, + MCP_SESSION_ID_HEADER, + PROCESS_CLEANUP_TIMEOUT, + PROCESS_FORCE_KILL_TIMEOUT, + PROCESS_TERMINATION_TIMEOUT, + PROCESS_WAIT_TIMEOUT, + SAFETY_ENV_ALLOWLIST, + SAFETY_HEADER_DENYLIST, + SAFETY_LOCAL_HOSTS, + SAFETY_NO_NETWORK_DEFAULT, + SAFETY_PROXY_ENV_DENYLIST, + SSE_CONTENT_TYPE, + WATCHDOG_DEFAULT_CHECK_INTERVAL, + WATCHDOG_EXTRA_BUFFER, + WATCHDOG_MAX_HANG_ADDITIONAL, +) +from .manager import Configuration, config + +__all__ = [ + # Constants + "DEFAULT_PROTOCOL_VERSION", + "CONTENT_TYPE_HEADER", + "JSON_CONTENT_TYPE", + "SSE_CONTENT_TYPE", + "DEFAULT_HTTP_ACCEPT", + "MCP_SESSION_ID_HEADER", + "MCP_PROTOCOL_VERSION_HEADER", + "WATCHDOG_DEFAULT_CHECK_INTERVAL", + "WATCHDOG_EXTRA_BUFFER", + "WATCHDOG_MAX_HANG_ADDITIONAL", + "SAFETY_LOCAL_HOSTS", + "SAFETY_NO_NETWORK_DEFAULT", + "SAFETY_HEADER_DENYLIST", + "SAFETY_PROXY_ENV_DENYLIST", + "SAFETY_ENV_ALLOWLIST", + "DEFAULT_TOOL_RUNS", + "DEFAULT_PROTOCOL_RUNS_PER_TYPE", + "DEFAULT_TIMEOUT", + "DEFAULT_TOOL_TIMEOUT", + "DEFAULT_MAX_TOOL_TIME", + "DEFAULT_MAX_TOTAL_FUZZING_TIME", + "DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT", + "DEFAULT_FORCE_KILL_TIMEOUT", + "PROCESS_TERMINATION_TIMEOUT", + "PROCESS_FORCE_KILL_TIMEOUT", + "PROCESS_CLEANUP_TIMEOUT", + "PROCESS_WAIT_TIMEOUT", + # Manager + "Configuration", + "config", +] + diff --git a/mcp_fuzzer/config/constants.py b/mcp_fuzzer/config/core/constants.py similarity index 96% rename from mcp_fuzzer/config/constants.py rename to mcp_fuzzer/config/core/constants.py index 9677b7a..2851d26 100644 --- a/mcp_fuzzer/config/constants.py +++ b/mcp_fuzzer/config/core/constants.py @@ -23,7 +23,7 @@ # Safety defaults # Hosts allowed for network operations by default. Keep local-only. SAFETY_LOCAL_HOSTS: set[str] = {"localhost", "127.0.0.1", "::1"} -# Default to deny network to non-local hosts +# Default to allow network access (set to True to deny by default) SAFETY_NO_NETWORK_DEFAULT: bool = False # Headers that should never be forwarded by default to avoid leakage SAFETY_HEADER_DENYLIST: set[str] = {"authorization", "cookie"} diff --git a/mcp_fuzzer/config/core/manager.py b/mcp_fuzzer/config/core/manager.py new file mode 100644 index 0000000..e725614 --- /dev/null +++ b/mcp_fuzzer/config/core/manager.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +"""Configuration management for MCP Fuzzer.""" + +import os +from typing import Any + + +def _get_float_from_env(key: str, default: float) -> float: + """Get a float value from environment variable. + + Args: + key: Environment variable name + default: Default value if not set or invalid + + Returns: + Float value from environment or default + """ + val = os.getenv(key) + if val is None: + return default + try: + return float(val) + except (TypeError, ValueError): + return default + + +def _get_bool_from_env(key: str, default: bool = False) -> bool: + """Get a boolean value from environment variable. + + Args: + key: Environment variable name + default: Default value if not set + + Returns: + Boolean value from environment or default + """ + val = os.getenv(key) + if val is None: + return default + return val.strip().lower() in {"1", "true", "yes", "on"} + + +class Configuration: + """Centralized configuration management for MCP Fuzzer.""" + + def __init__(self): + self._config: dict[str, Any] = {} + self._load_from_env() + + def _load_from_env(self) -> None: + """Load configuration values from environment variables.""" + self._config["timeout"] = _get_float_from_env("MCP_FUZZER_TIMEOUT", 30.0) + self._config["log_level"] = os.getenv("MCP_FUZZER_LOG_LEVEL", "INFO") + self._config["safety_enabled"] = _get_bool_from_env( + "MCP_FUZZER_SAFETY_ENABLED", False + ) + self._config["fs_root"] = os.getenv( + "MCP_FUZZER_FS_ROOT", os.path.expanduser("~/.mcp_fuzzer") + ) + self._config["http_timeout"] = _get_float_from_env( + "MCP_FUZZER_HTTP_TIMEOUT", 30.0 + ) + self._config["sse_timeout"] = _get_float_from_env( + "MCP_FUZZER_SSE_TIMEOUT", 30.0 + ) + self._config["stdio_timeout"] = _get_float_from_env( + "MCP_FUZZER_STDIO_TIMEOUT", 30.0 + ) + + def get(self, key: str, default: Any = None) -> Any: + """Get a configuration value by key.""" + return self._config.get(key, default) + + def set(self, key: str, value: Any) -> None: + """Set a configuration value.""" + self._config[key] = value + + def update(self, config_dict: dict[str, Any]) -> None: + """Update configuration with values from a dictionary.""" + self._config.update(config_dict) + +# Global configuration instance +config = Configuration() \ No newline at end of file diff --git a/mcp_fuzzer/config/extensions/__init__.py b/mcp_fuzzer/config/extensions/__init__.py new file mode 100644 index 0000000..34c0c30 --- /dev/null +++ b/mcp_fuzzer/config/extensions/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 +"""Configuration extensions (custom transports, etc.).""" + +from .transports import load_custom_transports + +__all__ = ["load_custom_transports"] + diff --git a/mcp_fuzzer/config/transports.py b/mcp_fuzzer/config/extensions/transports.py similarity index 75% rename from mcp_fuzzer/config/transports.py rename to mcp_fuzzer/config/extensions/transports.py index af92289..892af94 100644 --- a/mcp_fuzzer/config/transports.py +++ b/mcp_fuzzer/config/extensions/transports.py @@ -7,9 +7,9 @@ import logging from typing import Any -from ..exceptions import ConfigFileError, MCPError -from ..transport.custom import register_custom_transport -from ..transport.base import TransportProtocol +from ...exceptions import ConfigFileError, MCPError +from ...transport.custom import register_custom_transport +from ...transport.base import TransportProtocol logger = logging.getLogger(__name__) @@ -29,12 +29,13 @@ def load_custom_transports(config_data: dict[str, Any]) -> None: module = importlib.import_module(module_path) transport_class = getattr(module, class_name) - if not isinstance(transport_class, type): + try: + if not issubclass(transport_class, TransportProtocol): + raise ConfigFileError( + f"{module_path}.{class_name} must subclass TransportProtocol" + ) + except TypeError: raise ConfigFileError(f"{module_path}.{class_name} is not a class") - if not issubclass(transport_class, TransportProtocol): - raise ConfigFileError( - f"{module_path}.{class_name} must subclass TransportProtocol" - ) description = transport_config.get("description", "") config_schema = transport_config.get("config_schema") @@ -61,14 +62,18 @@ def load_custom_transports(config_data: dict[str, Any]) -> None: ) logger.info( - f"Loaded custom transport '{transport_name}' from " - f"{module_path}.{class_name}" + "Loaded custom transport '%s' from %s.%s", + transport_name, + module_path, + class_name, ) except MCPError: raise except Exception as e: - logger.error(f"Failed to load custom transport '{transport_name}': {e}") + logger.error( + "Failed to load custom transport '%s': %s", transport_name, e + ) raise ConfigFileError( f"Failed to load custom transport '{transport_name}': {e}" ) from e diff --git a/mcp_fuzzer/config/loader.py b/mcp_fuzzer/config/loader.py deleted file mode 100644 index 876d283..0000000 --- a/mcp_fuzzer/config/loader.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python3 -"""Configuration loader helpers that glue the discovery/parser stack.""" - -from __future__ import annotations - -import logging -from typing import Any, Callable, Tuple - -from .discovery import find_config_file -from .manager import config -from .parser import load_config_file -from .schema import get_config_schema # noqa: F401 (exported for consumers) -from .transports import load_custom_transports -from ..exceptions import ConfigFileError, MCPError - -logger = logging.getLogger(__name__) - -ConfigDict = dict[str, Any] -FileDiscoverer = Callable[ - [str | None, list[str] | None, list[str] | None], str | None -] -ConfigParser = Callable[[str], ConfigDict] -TransportLoader = Callable[[ConfigDict], None] - - -class ConfigLoader: - """Load configuration files with injectable discovery and parser implementations.""" - - def __init__( - self, - discoverer: FileDiscoverer | None = None, - parser: ConfigParser | None = None, - transport_loader: TransportLoader | None = None, - ): - self.discoverer = discoverer or find_config_file - self.parser = parser or load_config_file - self.transport_loader = transport_loader or load_custom_transports - - def load( - self, - config_path: str | None = None, - search_paths: list[str] | None = None, - file_names: list[str] | None = None, - ) -> Tuple[ConfigDict | None, str | None]: - """Return the configuration dictionary and source file path.""" - file_path = self.discoverer(config_path, search_paths, file_names) - if not file_path: - logger.debug("No configuration file found") - return None, None - - logger.info("Loading configuration from %s", file_path) - try: - config_data = self.parser(file_path) - self.transport_loader(config_data) - except (ConfigFileError, MCPError): - logger.exception("Failed to load configuration from %s", file_path) - raise - - return config_data, file_path - - def apply( - self, - config_path: str | None = None, - search_paths: list[str] | None = None, - file_names: list[str] | None = None, - ) -> bool: - """Load configuration and merge it into the runtime state.""" - try: - config_data, file_path = self.load(config_path, search_paths, file_names) - except (ConfigFileError, MCPError): - return False - - if not file_path: - return False - - config.update(config_data or {}) - return True - - -def apply_config_file( - config_path: str | None = None, - search_paths: list[str] | None = None, - file_names: list[str] | None = None, -) -> bool: - """Convenience helper that uses the default loader to update global config.""" - - loader = ConfigLoader() - return loader.apply( - config_path=config_path, - search_paths=search_paths, - file_names=file_names, - ) diff --git a/mcp_fuzzer/config/loading/__init__.py b/mcp_fuzzer/config/loading/__init__.py new file mode 100644 index 0000000..69fc315 --- /dev/null +++ b/mcp_fuzzer/config/loading/__init__.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 +"""Configuration file loading and discovery.""" + +from .discovery import find_config_file, find_config_file_from_params +from .loader import ConfigLoader, apply_config_file +from .parser import load_config_file # noqa: F401 +from .search_params import ConfigSearchParams + +__all__ = [ + "ConfigLoader", + "find_config_file", + "find_config_file_from_params", + "load_config_file", + "apply_config_file", + "ConfigSearchParams", +] + diff --git a/mcp_fuzzer/config/discovery.py b/mcp_fuzzer/config/loading/discovery.py similarity index 53% rename from mcp_fuzzer/config/discovery.py rename to mcp_fuzzer/config/loading/discovery.py index 4534f39..58021ad 100644 --- a/mcp_fuzzer/config/discovery.py +++ b/mcp_fuzzer/config/loading/discovery.py @@ -6,6 +6,8 @@ import os from pathlib import Path +from .search_params import ConfigSearchParams + def find_config_file( config_path: str | None = None, @@ -22,15 +24,46 @@ def find_config_file( Returns: Path to the found config file or None if not found """ - if config_path and os.path.isfile(config_path): - return config_path + params = ConfigSearchParams( + config_path=config_path, + search_paths=search_paths, + file_names=file_names, + ) + return _find_config_file_impl(params) + + +def find_config_file_from_params(params: ConfigSearchParams) -> str | None: + """Find a configuration file using ConfigSearchParams. + + Args: + params: Configuration search parameters + + Returns: + Path to the found config file or None if not found + """ + return _find_config_file_impl(params) + + +def _find_config_file_impl(params: ConfigSearchParams) -> str | None: + """Internal implementation of config file discovery. + + Args: + params: Configuration search parameters + + Returns: + Path to the found config file or None if not found + """ + if params.config_path and os.path.isfile(params.config_path): + return params.config_path + search_paths = params.search_paths if search_paths is None: search_paths = [ os.getcwd(), str(Path.home() / ".config" / "mcp-fuzzer"), ] + file_names = params.file_names if file_names is None: file_names = ["mcp-fuzzer.yml", "mcp-fuzzer.yaml"] diff --git a/mcp_fuzzer/config/loading/loader.py b/mcp_fuzzer/config/loading/loader.py new file mode 100644 index 0000000..b2df967 --- /dev/null +++ b/mcp_fuzzer/config/loading/loader.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 +"""Configuration loader helpers that glue the discovery/parser stack.""" + +from __future__ import annotations + +import logging +from typing import Any, Callable, Tuple + +from ..core.manager import Configuration, config +from ..extensions.transports import load_custom_transports +from ..schema.composer import get_config_schema # noqa: F401 (exported for consumers) +from .discovery import find_config_file +from .parser import load_config_file +from .search_params import ConfigSearchParams +from ...exceptions import ConfigFileError, MCPError + +logger = logging.getLogger(__name__) + +ConfigDict = dict[str, Any] +FileDiscoverer = Callable[ + [str | None, list[str] | None, list[str] | None], str | None +] +ConfigParser = Callable[[str], ConfigDict] +TransportLoader = Callable[[ConfigDict], None] + + +class ConfigLoader: + """Load configuration files with injectable discovery and parser implementations. + + This class reduces coupling by accepting a Configuration instance rather than + always using the global config object. + """ + + def __init__( + self, + discoverer: FileDiscoverer | None = None, + parser: ConfigParser | None = None, + transport_loader: TransportLoader | None = None, + config_instance: Configuration | None = None, + ): + self.discoverer = discoverer or find_config_file + self.parser = parser or load_config_file + self.transport_loader = transport_loader or load_custom_transports + self.config = config_instance or config + + def load( + self, + config_path: str | None = None, + search_paths: list[str] | None = None, + file_names: list[str] | None = None, + ) -> Tuple[ConfigDict | None, str | None]: + """Return the configuration dictionary and source file path. + + Args: + config_path: Explicit path to config file + search_paths: List of directories to search + file_names: List of file names to search for + + Returns: + Tuple of (config_dict, file_path) or (None, None) if not found + """ + file_path = self.discoverer(config_path, search_paths, file_names) + if not file_path: + logger.debug("No configuration file found") + return None, None + + logger.debug("Loading configuration from %s", file_path) + try: + config_data = self.parser(file_path) + self.transport_loader(config_data) + except (ConfigFileError, MCPError): + logger.exception("Failed to load configuration from %s", file_path) + raise + + return config_data, file_path + + def load_from_params( + self, params: ConfigSearchParams + ) -> Tuple[ConfigDict | None, str | None]: + """Load configuration using ConfigSearchParams. + + Args: + params: Configuration search parameters + + Returns: + Tuple of (config_dict, file_path) or (None, None) if not found + """ + return self.load( + config_path=params.config_path, + search_paths=params.search_paths, + file_names=params.file_names, + ) + + def apply( + self, + config_path: str | None = None, + search_paths: list[str] | None = None, + file_names: list[str] | None = None, + ) -> bool: + """Load configuration and merge it into the runtime state. + + Args: + config_path: Explicit path to config file + search_paths: List of directories to search + file_names: List of file names to search for + + Returns: + True if configuration was loaded and applied, False otherwise + """ + try: + config_data, file_path = self.load(config_path, search_paths, file_names) + except (ConfigFileError, MCPError) as e: + logger.debug("Failed to apply configuration: %s", e) + return False + + if not file_path: + return False + + self.config.update(config_data or {}) + return True + + def apply_from_params(self, params: ConfigSearchParams) -> bool: + """Apply configuration using ConfigSearchParams. + + Args: + params: Configuration search parameters + + Returns: + True if configuration was loaded and applied, False otherwise + """ + return self.apply( + config_path=params.config_path, + search_paths=params.search_paths, + file_names=params.file_names, + ) + + +def apply_config_file( + config_path: str | None = None, + search_paths: list[str] | None = None, + file_names: list[str] | None = None, +) -> bool: + """Convenience helper that uses the default loader to update global config. + + Args: + config_path: Explicit path to config file + search_paths: List of directories to search + file_names: List of file names to search for + + Returns: + True if configuration was loaded and applied, False otherwise + """ + loader = ConfigLoader() + return loader.apply( + config_path=config_path, + search_paths=search_paths, + file_names=file_names, + ) diff --git a/mcp_fuzzer/config/parser.py b/mcp_fuzzer/config/loading/parser.py similarity index 95% rename from mcp_fuzzer/config/parser.py rename to mcp_fuzzer/config/loading/parser.py index a04b1ce..bf45152 100644 --- a/mcp_fuzzer/config/parser.py +++ b/mcp_fuzzer/config/loading/parser.py @@ -8,7 +8,7 @@ import yaml -from ..exceptions import ConfigFileError +from ...exceptions import ConfigFileError def load_config_file(file_path: str) -> dict[str, Any]: @@ -37,7 +37,7 @@ def load_config_file(file_path: str) -> dict[str, Any]: return yaml.safe_load(f) or {} except yaml.YAMLError as e: raise ConfigFileError( - f"Error parsing YAML configuration file {file_path}: {str(e)}" + f"Error parsing YAML configuration file {file_path}: {e}" ) except PermissionError: raise ConfigFileError( @@ -45,5 +45,5 @@ def load_config_file(file_path: str) -> dict[str, Any]: ) except Exception as e: raise ConfigFileError( - f"Unexpected error reading configuration file {file_path}: {str(e)}" + f"Unexpected error reading configuration file {file_path}: {e}" ) diff --git a/mcp_fuzzer/config/loading/search_params.py b/mcp_fuzzer/config/loading/search_params.py new file mode 100644 index 0000000..f41d051 --- /dev/null +++ b/mcp_fuzzer/config/loading/search_params.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +"""Configuration search parameters dataclass to group related parameters.""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass +class ConfigSearchParams: + """Parameters for searching and loading configuration files. + + Groups related configuration search parameters to reduce parameter + list length and improve code clarity. + + Attributes: + config_path: Explicit path to config file, takes precedence if provided + search_paths: List of directories to search for config files + file_names: List of file names to search for + """ + + config_path: str | None = None + search_paths: list[str] | None = None + file_names: list[str] | None = None + diff --git a/mcp_fuzzer/config/manager.py b/mcp_fuzzer/config/manager.py deleted file mode 100644 index f08506f..0000000 --- a/mcp_fuzzer/config/manager.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python3 -"""Configuration management for MCP Fuzzer.""" - -import os -from typing import Any - -class Configuration: - """Centralized configuration management for MCP Fuzzer.""" - - def __init__(self): - self._config: dict[str, Any] = {} - self._load_from_env() - - def _load_from_env(self) -> None: - """Load configuration values from environment variables.""" - - def _get_float(key: str, default: float) -> float: - try: - return float(os.getenv(key, str(default))) - except (TypeError, ValueError): - return default - - def _get_bool(key: str, default: bool = False) -> bool: - val = os.getenv(key) - if val is None: - return default - return val.strip().lower() in {"1", "true", "yes", "on"} - - self._config["timeout"] = _get_float("MCP_FUZZER_TIMEOUT", 30.0) - self._config["log_level"] = os.getenv("MCP_FUZZER_LOG_LEVEL", "INFO") - self._config["safety_enabled"] = _get_bool("MCP_FUZZER_SAFETY_ENABLED", False) - self._config["fs_root"] = os.getenv( - "MCP_FUZZER_FS_ROOT", os.path.expanduser("~/.mcp_fuzzer") - ) - self._config["http_timeout"] = _get_float("MCP_FUZZER_HTTP_TIMEOUT", 30.0) - self._config["sse_timeout"] = _get_float("MCP_FUZZER_SSE_TIMEOUT", 30.0) - self._config["stdio_timeout"] = _get_float("MCP_FUZZER_STDIO_TIMEOUT", 30.0) - - def get(self, key: str, default: Any = None) -> Any: - """Get a configuration value by key.""" - return self._config.get(key, default) - - def set(self, key: str, value: Any) -> None: - """Set a configuration value.""" - self._config[key] = value - - def update(self, config_dict: dict[str, Any]) -> None: - """Update configuration with values from a dictionary.""" - self._config.update(config_dict) - -# Global configuration instance -config = Configuration() \ No newline at end of file diff --git a/mcp_fuzzer/config/schema.py b/mcp_fuzzer/config/schema.py deleted file mode 100644 index fda83e1..0000000 --- a/mcp_fuzzer/config/schema.py +++ /dev/null @@ -1,195 +0,0 @@ -#!/usr/bin/env python3 -"""Schema helpers for configuration validation.""" - -from __future__ import annotations - -from typing import Any - - -def get_config_schema() -> dict[str, Any]: - """Return the JSON schema describing the configuration structure.""" - return { - "type": "object", - "properties": { - "timeout": {"type": "number", "description": "Default timeout in seconds"}, - "tool_timeout": { - "type": "number", - "description": "Tool-specific timeout in seconds", - }, - "log_level": { - "type": "string", - "enum": ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], - }, - "safety_enabled": { - "type": "boolean", - "description": "Whether safety features are enabled", - }, - "fs_root": { - "type": "string", - "description": "Root directory for file operations", - }, - "http_timeout": { - "type": "number", - "description": "HTTP transport timeout in seconds", - }, - "sse_timeout": { - "type": "number", - "description": "SSE transport timeout in seconds", - }, - "stdio_timeout": { - "type": "number", - "description": "STDIO transport timeout in seconds", - }, - "mode": {"type": "string", "enum": ["tools", "protocol", "both"]}, - "phase": {"type": "string", "enum": ["realistic", "aggressive", "both"]}, - "protocol": { - "type": "string", - "enum": ["http", "https", "sse", "stdio", "streamablehttp"], - }, - "endpoint": {"type": "string", "description": "Server endpoint URL"}, - "runs": {"type": "integer", "description": "Number of fuzzing runs"}, - "runs_per_type": { - "type": "integer", - "description": "Number of runs per protocol type", - }, - "protocol_type": { - "type": "string", - "description": "Specific protocol type to fuzz", - }, - "no_network": {"type": "boolean", "description": "Disable network access"}, - "allow_hosts": { - "type": "array", - "items": {"type": "string"}, - "description": "List of allowed hosts", - }, - "max_concurrency": { - "type": "integer", - "description": "Maximum concurrent operations", - }, - "auth": { - "type": "object", - "properties": { - "providers": { - "type": "array", - "items": { - "type": "object", - "properties": { - "type": { - "type": "string", - "enum": ["api_key", "basic", "oauth", "custom"], - }, - "id": {"type": "string"}, - "config": {"type": "object"}, - }, - "required": ["type", "id"], - }, - }, - "mappings": { - "type": "object", - "additionalProperties": {"type": "string"}, - }, - }, - }, - "custom_transports": { - "type": "object", - "description": "Configuration for custom transport mechanisms", - "patternProperties": { - "^[a-zA-Z][a-zA-Z0-9_]*$": { - "type": "object", - "properties": { - "module": { - "type": "string", - "description": "Python module containing transport", - }, - "class": { - "type": "string", - "description": "Transport class name", - }, - "description": { - "type": "string", - "description": "Human-readable description", - }, - "factory": { - "type": "string", - "description": "Dotted path to factory function " - "(e.g., pkg.mod.create_transport)", - }, - "config_schema": { - "type": "object", - "description": "JSON schema for transport config", - }, - }, - "additionalProperties": False, - "required": ["module", "class"], - } - }, - "additionalProperties": False, - }, - "safety": { - "type": "object", - "properties": { - "enabled": {"type": "boolean"}, - "local_hosts": {"type": "array", "items": {"type": "string"}}, - "no_network": {"type": "boolean"}, - "header_denylist": {"type": "array", "items": {"type": "string"}}, - "proxy_env_denylist": { - "type": "array", - "items": {"type": "string"}, - }, - "env_allowlist": {"type": "array", "items": {"type": "string"}}, - }, - }, - "output": { - "type": "object", - "properties": { - "format": { - "type": "string", - "enum": ["json", "yaml", "csv", "xml"], - "description": "Output format for standardized reports", - }, - "directory": { - "type": "string", - "description": "Directory to save output files", - }, - "compress": { - "type": "boolean", - "description": "Whether to compress output files", - }, - "types": { - "type": "array", - "items": { - "type": "string", - "enum": [ - "fuzzing_results", - "error_report", - "safety_summary", - "performance_metrics", - "configuration_dump", - ], - }, - "description": "Specific output types to generate", - }, - "schema": { - "type": "string", - "description": "Path to custom output schema file", - }, - "retention": { - "type": "object", - "properties": { - "days": { - "type": "integer", - "description": "Number of days to retain output files", - }, - "max_size": { - "type": "string", - "description": ( - "Maximum size of output directory " - "(e.g., '1GB', '500MB')" - ), - }, - }, - }, - }, - }, - }, - } diff --git a/mcp_fuzzer/config/schema/__init__.py b/mcp_fuzzer/config/schema/__init__.py new file mode 100644 index 0000000..9e033f8 --- /dev/null +++ b/mcp_fuzzer/config/schema/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 +"""Configuration schema validation.""" + +from .composer import get_config_schema + +__all__ = ["get_config_schema"] + diff --git a/mcp_fuzzer/config/schema/builders.py b/mcp_fuzzer/config/schema/builders.py new file mode 100644 index 0000000..b4b41f0 --- /dev/null +++ b/mcp_fuzzer/config/schema/builders.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 +"""Schema builder functions for configuration validation.""" + +from __future__ import annotations + +from typing import Any + + +def build_timeout_schema() -> dict[str, Any]: + """Build schema for timeout-related configuration.""" + return { + "timeout": {"type": "number", "description": "Default timeout in seconds"}, + "tool_timeout": { + "type": "number", + "description": "Tool-specific timeout in seconds", + }, + "http_timeout": { + "type": "number", + "description": "HTTP transport timeout in seconds", + }, + "sse_timeout": { + "type": "number", + "description": "SSE transport timeout in seconds", + }, + "stdio_timeout": { + "type": "number", + "description": "STDIO transport timeout in seconds", + }, + } + + +def build_basic_schema() -> dict[str, Any]: + """Build schema for basic configuration properties.""" + return { + "log_level": { + "type": "string", + "enum": ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + }, + "safety_enabled": { + "type": "boolean", + "description": "Whether safety features are enabled", + }, + "fs_root": { + "type": "string", + "description": "Root directory for file operations", + }, + } + + +def build_fuzzing_schema() -> dict[str, Any]: + """Build schema for fuzzing-related configuration.""" + return { + "mode": {"type": "string", "enum": ["tools", "protocol", "both"]}, + "phase": {"type": "string", "enum": ["realistic", "aggressive", "both"]}, + "protocol": { + "type": "string", + "enum": ["http", "https", "sse", "stdio", "streamablehttp"], + }, + "endpoint": {"type": "string", "description": "Server endpoint URL"}, + "runs": {"type": "integer", "description": "Number of fuzzing runs"}, + "runs_per_type": { + "type": "integer", + "description": "Number of runs per protocol type", + }, + "protocol_type": { + "type": "string", + "description": "Specific protocol type to fuzz", + }, + "max_concurrency": { + "type": "integer", + "description": "Maximum concurrent operations", + }, + } + + +def build_network_schema() -> dict[str, Any]: + """Build schema for network-related configuration.""" + return { + "no_network": {"type": "boolean", "description": "Disable network access"}, + "allow_hosts": { + "type": "array", + "items": {"type": "string"}, + "description": "List of allowed hosts", + }, + } + + +def build_auth_schema() -> dict[str, Any]: + """Build schema for authentication configuration.""" + return { + "auth": { + "type": "object", + "properties": { + "providers": { + "type": "array", + "items": { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": ["api_key", "basic", "oauth", "custom"], + }, + "id": {"type": "string"}, + "config": {"type": "object"}, + }, + "required": ["type", "id"], + }, + }, + "mappings": { + "type": "object", + "additionalProperties": {"type": "string"}, + }, + }, + }, + } + + +def build_custom_transports_schema() -> dict[str, Any]: + """Build schema for custom transport configuration.""" + return { + "custom_transports": { + "type": "object", + "description": "Configuration for custom transport mechanisms", + "patternProperties": { + "^[a-zA-Z][a-zA-Z0-9_]*$": { + "type": "object", + "properties": { + "module": { + "type": "string", + "description": "Python module containing transport", + }, + "class": { + "type": "string", + "description": "Transport class name", + }, + "description": { + "type": "string", + "description": "Human-readable description", + }, + "factory": { + "type": "string", + "description": "Dotted path to factory function " + "(e.g., pkg.mod.create_transport)", + }, + "config_schema": { + "type": "object", + "description": "JSON schema for transport config", + }, + }, + "additionalProperties": False, + "required": ["module", "class"], + } + }, + "additionalProperties": False, + }, + } + + +def build_safety_schema() -> dict[str, Any]: + """Build schema for safety configuration.""" + return { + "safety": { + "type": "object", + "properties": { + "enabled": {"type": "boolean"}, + "local_hosts": {"type": "array", "items": {"type": "string"}}, + "no_network": {"type": "boolean"}, + "header_denylist": {"type": "array", "items": {"type": "string"}}, + "proxy_env_denylist": { + "type": "array", + "items": {"type": "string"}, + }, + "env_allowlist": {"type": "array", "items": {"type": "string"}}, + }, + }, + } + + +def build_output_schema() -> dict[str, Any]: + """Build schema for output configuration.""" + return { + "output": { + "type": "object", + "properties": { + "format": { + "type": "string", + "enum": ["json", "yaml", "csv", "xml"], + "description": "Output format for standardized reports", + }, + "directory": { + "type": "string", + "description": "Directory to save output files", + }, + "compress": { + "type": "boolean", + "description": "Whether to compress output files", + }, + "types": { + "type": "array", + "items": { + "type": "string", + "enum": [ + "fuzzing_results", + "error_report", + "safety_summary", + "performance_metrics", + "configuration_dump", + ], + }, + "description": "Specific output types to generate", + }, + "schema": { + "type": "string", + "description": "Path to custom output schema file", + }, + "retention": { + "type": "object", + "properties": { + "days": { + "type": "integer", + "description": "Number of days to retain output files", + }, + "max_size": { + "type": "string", + "description": ( + "Maximum size of output directory " + "(e.g., '1GB', '500MB')" + ), + }, + }, + }, + }, + }, + } + diff --git a/mcp_fuzzer/config/schema/composer.py b/mcp_fuzzer/config/schema/composer.py new file mode 100644 index 0000000..2700dd3 --- /dev/null +++ b/mcp_fuzzer/config/schema/composer.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +"""Schema composer that combines all schema builders.""" + +from __future__ import annotations + +from typing import Any + +from .builders import ( + build_auth_schema, + build_basic_schema, + build_custom_transports_schema, + build_fuzzing_schema, + build_network_schema, + build_output_schema, + build_safety_schema, + build_timeout_schema, +) + + +def get_config_schema() -> dict[str, Any]: + """Return the JSON schema describing the configuration structure. + + The schema is built by composing smaller schema builders for logical + groupings of configuration properties. + + Returns: + Complete JSON schema dictionary for configuration validation + """ + properties = {} + properties.update(build_timeout_schema()) + properties.update(build_basic_schema()) + properties.update(build_fuzzing_schema()) + properties.update(build_network_schema()) + properties.update(build_auth_schema()) + properties.update(build_custom_transports_schema()) + properties.update(build_safety_schema()) + properties.update(build_output_schema()) + + return { + "type": "object", + "properties": properties, + } + diff --git a/mcp_fuzzer/fuzz_engine/runtime/watchdog.py b/mcp_fuzzer/fuzz_engine/runtime/watchdog.py index fc7129d..e23f018 100644 --- a/mcp_fuzzer/fuzz_engine/runtime/watchdog.py +++ b/mcp_fuzzer/fuzz_engine/runtime/watchdog.py @@ -19,7 +19,12 @@ import time from typing import Any, Awaitable, Callable, Protocol -from ...config.constants import PROCESS_FORCE_KILL_TIMEOUT, PROCESS_TERMINATION_TIMEOUT +# Import constants directly from config (constants are values, not behavior) +# Behavior (functions/classes) should go through client mediator +from ...config.core.constants import ( + PROCESS_FORCE_KILL_TIMEOUT, + PROCESS_TERMINATION_TIMEOUT, +) from ...exceptions import MCPError, ProcessStopError, WatchdogStartError from .config import WatchdogConfig from .registry import ProcessRecord, ProcessRegistry diff --git a/mcp_fuzzer/fuzz_engine/strategy/realistic/protocol_type_strategy.py b/mcp_fuzzer/fuzz_engine/strategy/realistic/protocol_type_strategy.py index 213db37..095704e 100644 --- a/mcp_fuzzer/fuzz_engine/strategy/realistic/protocol_type_strategy.py +++ b/mcp_fuzzer/fuzz_engine/strategy/realistic/protocol_type_strategy.py @@ -10,7 +10,8 @@ from typing import Any from hypothesis import strategies as st -from ....config import DEFAULT_PROTOCOL_VERSION +# Import constants directly from config (constants are values, not behavior) +from ....config.core.constants import DEFAULT_PROTOCOL_VERSION # Helper to keep URIs local-only SAFE_FILE_URIS = [ diff --git a/mcp_fuzzer/reports/reporter/__init__.py b/mcp_fuzzer/reports/reporter/__init__.py index ae0207f..11841c9 100644 --- a/mcp_fuzzer/reports/reporter/__init__.py +++ b/mcp_fuzzer/reports/reporter/__init__.py @@ -70,9 +70,11 @@ def __init__( """ # Dependency injection: use provided config or fall back to global if config_provider is None: - from ...config import config as default_config + from ...client.adapters import config_mediator - config_provider = default_config + # Use config_mediator as the provider + # (it implements dict-like interface via get()) + config_provider = config_mediator resolved_config = config or ReporterConfig.from_provider( provider=config_provider, diff --git a/mcp_fuzzer/safety_system/policy.py b/mcp_fuzzer/safety_system/policy.py index 618f1f1..e918dc8 100644 --- a/mcp_fuzzer/safety_system/policy.py +++ b/mcp_fuzzer/safety_system/policy.py @@ -12,7 +12,9 @@ import os from collections.abc import Iterable -from ..config import ( +# Import constants directly from config (constants are values, not behavior) +# Behavior (functions/classes) should go through client mediator +from ..config.core.constants import ( SAFETY_LOCAL_HOSTS, SAFETY_NO_NETWORK_DEFAULT, SAFETY_PROXY_ENV_DENYLIST, diff --git a/mcp_fuzzer/transport/http.py b/mcp_fuzzer/transport/http.py index 417def1..db91cb5 100644 --- a/mcp_fuzzer/transport/http.py +++ b/mcp_fuzzer/transport/http.py @@ -10,7 +10,8 @@ from .base import TransportProtocol from .mixins import NetworkTransportMixin, ResponseParsingMixin from ..fuzz_engine.runtime import ProcessManager, WatchdogConfig -from ..config import ( +# Import constants directly from config (constants are values, not behavior) +from ..config.core.constants import ( JSON_CONTENT_TYPE, DEFAULT_HTTP_ACCEPT, ) diff --git a/mcp_fuzzer/transport/stdio.py b/mcp_fuzzer/transport/stdio.py index c9aa0a2..80b8e17 100644 --- a/mcp_fuzzer/transport/stdio.py +++ b/mcp_fuzzer/transport/stdio.py @@ -20,7 +20,8 @@ ) from ..fuzz_engine.runtime import ProcessManager, WatchdogConfig from ..safety_system.policy import sanitize_subprocess_env -from ..config.constants import PROCESS_WAIT_TIMEOUT +# Import constants directly from config (constants are values, not behavior) +from ..config.core.constants import PROCESS_WAIT_TIMEOUT from .manager import TransportManager class StdioTransport(TransportProtocol): diff --git a/mcp_fuzzer/transport/streamable_http.py b/mcp_fuzzer/transport/streamable_http.py index 2e3be62..a0ba714 100644 --- a/mcp_fuzzer/transport/streamable_http.py +++ b/mcp_fuzzer/transport/streamable_http.py @@ -4,7 +4,8 @@ from typing import Any import httpx -from ..config import ( +# Import constants directly from config (constants are values, not behavior) +from ..config.core.constants import ( DEFAULT_PROTOCOL_VERSION, CONTENT_TYPE_HEADER, JSON_CONTENT_TYPE, diff --git a/tests/integration/test_custom_transport.py b/tests/integration/test_custom_transport.py index c1af8e7..58c748c 100644 --- a/tests/integration/test_custom_transport.py +++ b/tests/integration/test_custom_transport.py @@ -5,7 +5,8 @@ import os from pathlib import Path -from mcp_fuzzer.config import apply_config_file, load_custom_transports +from mcp_fuzzer.client.adapters import config_mediator +from mcp_fuzzer.config.extensions.transports import load_custom_transports from mcp_fuzzer.exceptions import ConfigFileError, TransportRegistrationError from mcp_fuzzer.transport import create_transport, register_custom_transport from mcp_fuzzer.transport.base import TransportProtocol @@ -97,7 +98,7 @@ def test_config_file_custom_transport_loading(self): try: # Load config and custom transports from the file we wrote - assert apply_config_file(config_path=config_path) is True + assert config_mediator.apply_file(config_path=config_path) is True # Test that transport was loaded from mcp_fuzzer.transport import list_custom_transports diff --git a/tests/integration/test_standardized_output.py b/tests/integration/test_standardized_output.py index 5881e61..898e172 100644 --- a/tests/integration/test_standardized_output.py +++ b/tests/integration/test_standardized_output.py @@ -123,10 +123,10 @@ def test_output_directory_structure(self): def test_configuration_driven_output(self): """Test that output generation respects configuration settings.""" - from mcp_fuzzer.config import config + from mcp_fuzzer.client.adapters import config_mediator # Set output configuration - config.update({ + config_mediator.update({ "output": { "format": "json", "directory": self.temp_dir, diff --git a/tests/unit/cli/test_cli.py b/tests/unit/cli/test_cli.py index d255a14..226b2ea 100644 --- a/tests/unit/cli/test_cli.py +++ b/tests/unit/cli/test_cli.py @@ -268,7 +268,7 @@ def test_build_cli_config_merges_and_returns_cli_config(): def test_handle_validate_config(monkeypatch): validator = ValidationManager() with patch( - "mcp_fuzzer.cli.validators.load_config_file" + "mcp_fuzzer.cli.validators.config_mediator.load_file" ) as mock_load: validator.validate_config_file("config.yml") mock_load.assert_called_once_with("config.yml") @@ -545,7 +545,7 @@ def test_run_cli_unexpected_error_debug(monkeypatch, caplog): def test_build_cli_config_uses_config_file(monkeypatch): args = _base_args(config="custom.yml", endpoint=None) with patch( - "mcp_fuzzer.cli.config_merge.load_config_file", + "mcp_fuzzer.cli.config_merge.config_mediator.load_file", return_value={ "endpoint": "http://conf", "runs": 42, @@ -561,7 +561,7 @@ def test_build_cli_config_handles_apply_config_error(caplog): caplog.set_level(logging.DEBUG) args = _base_args(config=None) with patch( - "mcp_fuzzer.cli.config_merge.apply_config_file", + "mcp_fuzzer.cli.config_merge.config_mediator.apply_file", side_effect=Exception("fail"), ): cli_config = build_cli_config(args) @@ -571,7 +571,7 @@ def test_build_cli_config_handles_apply_config_error(caplog): def test_build_cli_config_raises_config_error(): args = _base_args(config="bad.yml") with patch( - "mcp_fuzzer.cli.config_merge.load_config_file", + "mcp_fuzzer.cli.config_merge.config_mediator.load_file", side_effect=ValueError("bad"), ): with pytest.raises(Exception): diff --git a/tests/unit/client/test_config_adapter.py b/tests/unit/client/test_config_adapter.py new file mode 100644 index 0000000..b73cd1f --- /dev/null +++ b/tests/unit/client/test_config_adapter.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +"""Unit tests for config adapter and port.""" + +from __future__ import annotations + +from pathlib import Path +from textwrap import dedent +from unittest.mock import Mock, patch + +import pytest + +from mcp_fuzzer.client.adapters import ConfigAdapter, config_mediator +from mcp_fuzzer.client.ports import ConfigPort +from mcp_fuzzer.exceptions import ConfigFileError + + +def test_config_adapter_implements_port(): + """Test that ConfigAdapter implements ConfigPort interface.""" + adapter = ConfigAdapter() + assert isinstance(adapter, ConfigPort) + + +def test_config_adapter_get_set(): + """Test ConfigAdapter get and set methods.""" + adapter = ConfigAdapter() + adapter.set("test_key", "test_value") + assert adapter.get("test_key") == "test_value" + assert adapter.get("nonexistent", "default") == "default" + + +def test_config_adapter_update(): + """Test ConfigAdapter update method.""" + adapter = ConfigAdapter() + adapter.update({"key1": "value1", "key2": "value2"}) + assert adapter.get("key1") == "value1" + assert adapter.get("key2") == "value2" + + +def test_config_adapter_load_file(tmp_path): + """Test ConfigAdapter load_file method.""" + config_file = tmp_path / "test.yaml" + config_file.write_text("timeout: 30.0\nlog_level: DEBUG") + + adapter = ConfigAdapter() + config_data = adapter.load_file(str(config_file)) + assert config_data["timeout"] == 30.0 + assert config_data["log_level"] == "DEBUG" + + +def test_config_adapter_load_file_not_found(): + """Test ConfigAdapter load_file raises error for missing file.""" + adapter = ConfigAdapter() + with pytest.raises(ConfigFileError): + adapter.load_file("/nonexistent/path.yaml") + + +def test_config_adapter_apply_file(tmp_path): + """Test ConfigAdapter apply_file method.""" + config_file = tmp_path / "test.yaml" + config_file.write_text("timeout: 45.0\nlog_level: INFO") + + adapter = ConfigAdapter() + result = adapter.apply_file(config_path=str(config_file)) + assert result is True + assert adapter.get("timeout") == 45.0 + assert adapter.get("log_level") == "INFO" + + +def test_config_adapter_apply_file_not_found(): + """Test ConfigAdapter apply_file returns False for missing file.""" + adapter = ConfigAdapter() + result = adapter.apply_file(config_path="/nonexistent/path.yaml") + assert result is False + + +def test_config_adapter_get_schema(): + """Test ConfigAdapter get_schema method.""" + adapter = ConfigAdapter() + schema = adapter.get_schema() + assert isinstance(schema, dict) + assert schema["type"] == "object" + assert "properties" in schema + + +def test_config_adapter_custom_instance(): + """Test ConfigAdapter with custom config instance.""" + mock_config = Mock() + mock_config.get = Mock(return_value="custom_value") + mock_config.set = Mock() + mock_config.update = Mock() + + adapter = ConfigAdapter(config_instance=mock_config) + assert adapter.get("test_key") == "custom_value" + mock_config.get.assert_called_once_with("test_key", None) + + +def test_config_mediator_is_global_instance(): + """Test that config_mediator is a global ConfigAdapter instance.""" + assert isinstance(config_mediator, ConfigAdapter) + assert isinstance(config_mediator, ConfigPort) + + +def test_config_mediator_functionality(tmp_path): + """Test that global config_mediator works correctly.""" + config_file = tmp_path / "test.yaml" + config_file.write_text("timeout: 60.0") + + # Test apply_file + result = config_mediator.apply_file(config_path=str(config_file)) + assert result is True + + # Test get + assert config_mediator.get("timeout") == 60.0 + + # Test set + config_mediator.set("custom_key", "custom_value") + assert config_mediator.get("custom_key") == "custom_value" + + # Test update + config_mediator.update({"key1": "val1", "key2": "val2"}) + assert config_mediator.get("key1") == "val1" + assert config_mediator.get("key2") == "val2" + + +def test_config_mediator_as_mapping(): + """Test that config_mediator can be used as a Mapping-like object.""" + config_mediator.set("test_key", "test_value") + # Should work with .get() method (Mapping interface) + assert config_mediator.get("test_key") == "test_value" + assert config_mediator.get("nonexistent", "default") == "default" + diff --git a/tests/unit/config/test_config_loader.py b/tests/unit/config/test_config_loader.py index 5a992dc..fb7e8c6 100644 --- a/tests/unit/config/test_config_loader.py +++ b/tests/unit/config/test_config_loader.py @@ -13,11 +13,13 @@ from mcp_fuzzer.config import ( ConfigLoader, + ConfigSearchParams, apply_config_file, find_config_file, get_config_schema, load_config_file, ) +from mcp_fuzzer.config.core.manager import Configuration from mcp_fuzzer.exceptions import ConfigFileError @@ -101,8 +103,8 @@ def test_load_config_file_invalid_yaml(tmp_path): load_config_file(str(invalid)) -@patch("mcp_fuzzer.config.loader.load_custom_transports") -@patch("mcp_fuzzer.config.loader.config") +@patch("mcp_fuzzer.config.loading.loader.load_custom_transports") +@patch("mcp_fuzzer.config.loading.loader.config") def test_apply_config_file_updates_global_state( mock_config, mock_transports, config_files ): @@ -114,8 +116,8 @@ def test_apply_config_file_updates_global_state( assert mock_config.update.call_count == 1 -@patch("mcp_fuzzer.config.loader.load_custom_transports") -@patch("mcp_fuzzer.config.loader.config") +@patch("mcp_fuzzer.config.loading.loader.load_custom_transports") +@patch("mcp_fuzzer.config.loading.loader.config") def test_apply_config_file_returns_false_when_missing(mock_config, mock_transports): result = apply_config_file(config_path="/nope.yaml") assert result is False @@ -155,36 +157,104 @@ def test_config_loader_load_returns_none_when_not_found(): def test_config_loader_apply_merges_data(): parser = Mock(return_value={"log_level": "INFO"}) + mock_config = Mock(spec=Configuration) loader = ConfigLoader( discoverer=lambda *_: "config.yaml", parser=parser, transport_loader=Mock(), + config_instance=mock_config, ) - with patch("mcp_fuzzer.config.loader.config") as mock_config: - assert loader.apply() is True - mock_config.update.assert_called_once_with({"log_level": "INFO"}) + assert loader.apply() is True + mock_config.update.assert_called_once_with({"log_level": "INFO"}) def test_config_loader_apply_handles_parser_errors(): parser = Mock(side_effect=ConfigFileError("boom")) + mock_config = Mock(spec=Configuration) loader = ConfigLoader( discoverer=lambda *_: "config.yaml", parser=parser, transport_loader=Mock(), + config_instance=mock_config, ) - with patch("mcp_fuzzer.config.loader.config") as mock_config: - assert loader.apply() is False - mock_config.update.assert_not_called() + assert loader.apply() is False + mock_config.update.assert_not_called() def test_config_loader_apply_handles_transport_errors(): parser = Mock(return_value={"timeout": 5}) transport_loader = Mock(side_effect=ConfigFileError("bad transport")) + mock_config = Mock(spec=Configuration) loader = ConfigLoader( discoverer=lambda *_: "config.yaml", parser=parser, transport_loader=transport_loader, + config_instance=mock_config, + ) + assert loader.apply() is False + mock_config.update.assert_not_called() + + +def test_config_loader_load_logs_at_debug_level(config_files): + """Test that loading configuration logs at DEBUG level.""" + import logging + + with patch("mcp_fuzzer.config.loading.loader.logger") as mock_logger: + loader = ConfigLoader() + loader.load(config_path=config_files["yaml_path"]) + # Should log at DEBUG level, not INFO + mock_logger.debug.assert_called() + mock_logger.info.assert_not_called() + + +def test_config_loader_apply_logs_failures(): + """Test that apply() logs failures at DEBUG level.""" + parser = Mock(side_effect=ConfigFileError("test error")) + mock_config = Mock(spec=Configuration) + loader = ConfigLoader( + discoverer=lambda *_: "config.yaml", + parser=parser, + transport_loader=Mock(), + config_instance=mock_config, ) - with patch("mcp_fuzzer.config.loader.config") as mock_config: - assert loader.apply() is False - mock_config.update.assert_not_called() + with patch("mcp_fuzzer.config.loading.loader.logger") as mock_logger: + result = loader.apply() + assert result is False + # Should log the failure at DEBUG level + mock_logger.debug.assert_called() + # Check that the log message contains the expected text + call_args_str = str(mock_logger.debug.call_args) + assert ( + "Failed to apply configuration" in call_args_str + or "test error" in call_args_str + ) + + +def test_config_loader_load_from_params(): + """Test ConfigLoader.load_from_params() method.""" + parser = Mock(return_value={"timeout": 30}) + params = ConfigSearchParams(config_path="test.yaml") + loader = ConfigLoader( + discoverer=lambda *args, **kwargs: "test.yaml", + parser=parser, + transport_loader=Mock(), + ) + data, path = loader.load_from_params(params) + assert data == {"timeout": 30} + assert path == "test.yaml" + + +def test_config_loader_apply_from_params(): + """Test ConfigLoader.apply_from_params() method.""" + parser = Mock(return_value={"log_level": "DEBUG"}) + mock_config = Mock(spec=Configuration) + params = ConfigSearchParams(config_path="test.yaml") + loader = ConfigLoader( + discoverer=lambda *args, **kwargs: "test.yaml", + parser=parser, + transport_loader=Mock(), + config_instance=mock_config, + ) + result = loader.apply_from_params(params) + assert result is True + mock_config.update.assert_called_once_with({"log_level": "DEBUG"}) diff --git a/tests/unit/config/test_constants.py b/tests/unit/config/test_constants.py new file mode 100644 index 0000000..3bbe276 --- /dev/null +++ b/tests/unit/config/test_constants.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +"""Unit tests for configuration constants.""" + +from __future__ import annotations + +import pytest + +from mcp_fuzzer.config import ( + DEFAULT_FORCE_KILL_TIMEOUT, + DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, + DEFAULT_MAX_TOTAL_FUZZING_TIME, + DEFAULT_MAX_TOOL_TIME, + DEFAULT_PROTOCOL_RUNS_PER_TYPE, + DEFAULT_PROTOCOL_VERSION, + DEFAULT_TIMEOUT, + DEFAULT_TOOL_RUNS, + DEFAULT_TOOL_TIMEOUT, + PROCESS_CLEANUP_TIMEOUT, + PROCESS_FORCE_KILL_TIMEOUT, + PROCESS_TERMINATION_TIMEOUT, + PROCESS_WAIT_TIMEOUT, + SAFETY_ENV_ALLOWLIST, + SAFETY_HEADER_DENYLIST, + SAFETY_LOCAL_HOSTS, + SAFETY_NO_NETWORK_DEFAULT, + SAFETY_PROXY_ENV_DENYLIST, +) + + +def test_process_constants_exported(): + """Test that PROCESS_* constants are properly exported.""" + assert PROCESS_TERMINATION_TIMEOUT == 0.5 + assert PROCESS_FORCE_KILL_TIMEOUT == 1.0 + assert PROCESS_CLEANUP_TIMEOUT == 5.0 + assert PROCESS_WAIT_TIMEOUT == 1.0 + assert isinstance(PROCESS_TERMINATION_TIMEOUT, float) + assert isinstance(PROCESS_FORCE_KILL_TIMEOUT, float) + assert isinstance(PROCESS_CLEANUP_TIMEOUT, float) + assert isinstance(PROCESS_WAIT_TIMEOUT, float) + + +def test_safety_no_network_default(): + """Test that SAFETY_NO_NETWORK_DEFAULT is properly documented.""" + # The constant should be False (allow network by default) + # with a clear comment explaining it + assert SAFETY_NO_NETWORK_DEFAULT is False + assert isinstance(SAFETY_NO_NETWORK_DEFAULT, bool) + + +def test_safety_local_hosts(): + """Test that SAFETY_LOCAL_HOSTS contains expected local hosts.""" + assert "localhost" in SAFETY_LOCAL_HOSTS + assert "127.0.0.1" in SAFETY_LOCAL_HOSTS + assert "::1" in SAFETY_LOCAL_HOSTS + assert isinstance(SAFETY_LOCAL_HOSTS, set) + + +def test_safety_header_denylist(): + """Test that SAFETY_HEADER_DENYLIST contains sensitive headers.""" + assert "authorization" in SAFETY_HEADER_DENYLIST + assert "cookie" in SAFETY_HEADER_DENYLIST + assert isinstance(SAFETY_HEADER_DENYLIST, set) + + +def test_safety_proxy_env_denylist(): + """Test that SAFETY_PROXY_ENV_DENYLIST contains proxy env vars.""" + expected_vars = { + "HTTP_PROXY", + "HTTPS_PROXY", + "ALL_PROXY", + "NO_PROXY", + "http_proxy", + "https_proxy", + "all_proxy", + "no_proxy", + } + assert expected_vars.issubset(SAFETY_PROXY_ENV_DENYLIST) + assert isinstance(SAFETY_PROXY_ENV_DENYLIST, set) + + +def test_safety_env_allowlist(): + """Test that SAFETY_ENV_ALLOWLIST is a set.""" + assert isinstance(SAFETY_ENV_ALLOWLIST, set) + + +def test_default_timeout_values(): + """Test that default timeout values are positive floats.""" + assert DEFAULT_TIMEOUT > 0 + assert DEFAULT_TOOL_TIMEOUT > 0 + assert DEFAULT_MAX_TOOL_TIME > 0 + assert DEFAULT_MAX_TOTAL_FUZZING_TIME > 0 + assert DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT > 0 + assert DEFAULT_FORCE_KILL_TIMEOUT > 0 + assert all(isinstance(v, float) for v in [ + DEFAULT_TIMEOUT, + DEFAULT_TOOL_TIMEOUT, + DEFAULT_MAX_TOOL_TIME, + DEFAULT_MAX_TOTAL_FUZZING_TIME, + DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, + DEFAULT_FORCE_KILL_TIMEOUT, + ]) + + +def test_default_run_counts(): + """Test that default run counts are positive integers.""" + assert DEFAULT_TOOL_RUNS > 0 + assert DEFAULT_PROTOCOL_RUNS_PER_TYPE > 0 + assert isinstance(DEFAULT_TOOL_RUNS, int) + assert isinstance(DEFAULT_PROTOCOL_RUNS_PER_TYPE, int) + + +def test_default_protocol_version(): + """Test that default protocol version is a string.""" + assert isinstance(DEFAULT_PROTOCOL_VERSION, str) + assert len(DEFAULT_PROTOCOL_VERSION) > 0 + diff --git a/tests/unit/config/test_discovery_transports.py b/tests/unit/config/test_discovery_transports.py index aede0a0..4720cc5 100644 --- a/tests/unit/config/test_discovery_transports.py +++ b/tests/unit/config/test_discovery_transports.py @@ -8,8 +8,12 @@ import pytest -from mcp_fuzzer.config.discovery import find_config_file -from mcp_fuzzer.config.transports import load_custom_transports +from mcp_fuzzer.config.loading.discovery import ( + find_config_file, + find_config_file_from_params, +) +from mcp_fuzzer.config.loading.search_params import ConfigSearchParams +from mcp_fuzzer.config.extensions.transports import load_custom_transports from mcp_fuzzer.exceptions import ConfigFileError from mcp_fuzzer.transport.base import TransportProtocol from mcp_fuzzer.transport.custom import list_custom_transports, registry @@ -47,14 +51,19 @@ def test_find_config_file_prefers_explicit_path(tmp_path): """Explicit config_path should be returned even if other files exist.""" path = tmp_path / "mcp-fuzzer.yaml" path.write_text("timeout: 5") - assert find_config_file(config_path=str(path), search_paths=[str(tmp_path)]) == str(path) + result = find_config_file( + config_path=str(path), search_paths=[str(tmp_path)] + ) + assert result == str(path) def test_find_config_file_search_paths(tmp_path): """Search paths should be honored when they contain a config file.""" path = tmp_path / "mcp-fuzzer.yml" path.write_text("timeout: 10\n") - result = find_config_file(search_paths=[str(tmp_path)], file_names=["mcp-fuzzer.yml"]) + result = find_config_file( + search_paths=[str(tmp_path)], file_names=["mcp-fuzzer.yml"] + ) assert result == str(path) @@ -102,3 +111,74 @@ def test_load_custom_transports_invalid_class_raises(): } } ) + + +def test_find_config_file_from_params(tmp_path): + """Test find_config_file_from_params with ConfigSearchParams.""" + path = tmp_path / "mcp-fuzzer.yaml" + path.write_text("timeout: 5") + params = ConfigSearchParams( + config_path=str(path), + search_paths=[str(tmp_path)], + file_names=["mcp-fuzzer.yaml"], + ) + result = find_config_file_from_params(params) + assert result == str(path) + + +def test_find_config_file_from_params_search(tmp_path): + """Test find_config_file_from_params with search paths.""" + path = tmp_path / "mcp-fuzzer.yml" + path.write_text("timeout: 10") + params = ConfigSearchParams( + config_path=None, + search_paths=[str(tmp_path)], + file_names=["mcp-fuzzer.yml"], + ) + result = find_config_file_from_params(params) + assert result == str(path) + + +def test_load_custom_transports_non_class_raises(): + """Non-class attributes should raise ConfigFileError with TypeError handling.""" + config_data = { + "custom_transports": { + "invalid": { + "module": __name__, + # This is a function, not a class + "class": "test_load_custom_transports_non_class_raises", + } + } + } + with pytest.raises(ConfigFileError, match="is not a class"): + load_custom_transports(config_data) + + +def test_load_custom_transports_logs_with_percent_formatting(monkeypatch): + """Test that transport loading uses proper logging format (not f-strings).""" + from unittest.mock import Mock + + mock_logger = Mock() + monkeypatch.setattr("mcp_fuzzer.config.extensions.transports.logger", mock_logger) + + config_data = { + "custom_transports": { + "dummy": { + "module": __name__, + "class": "DummyTransport", + "description": "Test transport", + } + } + } + + load_custom_transports(config_data) + + # Verify logger.info was called with separate arguments (not f-string) + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args + # Should be called with format string and separate args (not f-string) + assert len(call_args[0]) == 4 # format string + 3 args + assert call_args[0][0] == "Loaded custom transport '%s' from %s.%s" + assert call_args[0][1] == "dummy" + assert call_args[0][2] == __name__ + assert call_args[0][3] == "DummyTransport" diff --git a/tests/unit/config/test_manager.py b/tests/unit/config/test_manager.py new file mode 100644 index 0000000..0027507 --- /dev/null +++ b/tests/unit/config/test_manager.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +"""Unit tests for configuration manager.""" + +from __future__ import annotations + +import os +from unittest.mock import patch + +import pytest + +from mcp_fuzzer.config.core.manager import ( + Configuration, + _get_bool_from_env, + _get_float_from_env, + config, +) + + +def test_get_float_from_env_with_value(): + """Test _get_float_from_env with valid environment variable.""" + with patch.dict(os.environ, {"TEST_FLOAT": "42.5"}): + assert _get_float_from_env("TEST_FLOAT", 0.0) == 42.5 + + +def test_get_float_from_env_with_default(): + """Test _get_float_from_env when variable is not set.""" + with patch.dict(os.environ, {}, clear=True): + assert _get_float_from_env("TEST_FLOAT", 10.0) == 10.0 + + +def test_get_float_from_env_with_invalid_value(): + """Test _get_float_from_env with invalid value falls back to default.""" + with patch.dict(os.environ, {"TEST_FLOAT": "not_a_number"}): + assert _get_float_from_env("TEST_FLOAT", 5.0) == 5.0 + + +def test_get_bool_from_env_true_values(): + """Test _get_bool_from_env with various true values.""" + true_values = ["1", "true", "True", "TRUE", "yes", "YES", "on", "ON"] + for val in true_values: + with patch.dict(os.environ, {"TEST_BOOL": val}): + assert _get_bool_from_env("TEST_BOOL", False) is True + + +def test_get_bool_from_env_false_values(): + """Test _get_bool_from_env with false values.""" + false_values = ["0", "false", "no", "off", "anything_else"] + for val in false_values: + with patch.dict(os.environ, {"TEST_BOOL": val}): + assert _get_bool_from_env("TEST_BOOL", True) is False + + +def test_get_bool_from_env_with_default(): + """Test _get_bool_from_env when variable is not set.""" + with patch.dict(os.environ, {}, clear=True): + assert _get_bool_from_env("TEST_BOOL", True) is True + assert _get_bool_from_env("TEST_BOOL", False) is False + + +def test_configuration_loads_from_env(): + """Test Configuration loads values from environment variables.""" + env_vars = { + "MCP_FUZZER_TIMEOUT": "60.0", + "MCP_FUZZER_LOG_LEVEL": "DEBUG", + "MCP_FUZZER_SAFETY_ENABLED": "true", + "MCP_FUZZER_FS_ROOT": "/custom/path", + "MCP_FUZZER_HTTP_TIMEOUT": "45.0", + "MCP_FUZZER_SSE_TIMEOUT": "50.0", + "MCP_FUZZER_STDIO_TIMEOUT": "55.0", + } + with patch.dict(os.environ, env_vars): + cfg = Configuration() + assert cfg.get("timeout") == 60.0 + assert cfg.get("log_level") == "DEBUG" + assert cfg.get("safety_enabled") is True + assert cfg.get("fs_root") == "/custom/path" + assert cfg.get("http_timeout") == 45.0 + assert cfg.get("sse_timeout") == 50.0 + assert cfg.get("stdio_timeout") == 55.0 + + +def test_configuration_uses_defaults(): + """Test Configuration uses defaults when env vars are not set.""" + with patch.dict(os.environ, {}, clear=True): + cfg = Configuration() + assert cfg.get("timeout") == 30.0 + assert cfg.get("log_level") == "INFO" + assert cfg.get("safety_enabled") is False + + +def test_configuration_get_with_default(): + """Test Configuration.get() with custom default.""" + cfg = Configuration() + assert cfg.get("nonexistent_key", "default_value") == "default_value" + + +def test_configuration_set(): + """Test Configuration.set() method.""" + cfg = Configuration() + cfg.set("custom_key", "custom_value") + assert cfg.get("custom_key") == "custom_value" + + +def test_configuration_update(): + """Test Configuration.update() method.""" + cfg = Configuration() + cfg.update({"key1": "value1", "key2": "value2"}) + assert cfg.get("key1") == "value1" + assert cfg.get("key2") == "value2" + + +def test_global_config_instance(): + """Test that global config instance exists and is usable.""" + assert config is not None + assert isinstance(config, Configuration) + # Should have default values + assert config.get("timeout") is not None + diff --git a/tests/unit/config/test_parser.py b/tests/unit/config/test_parser.py new file mode 100644 index 0000000..9a58984 --- /dev/null +++ b/tests/unit/config/test_parser.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +"""Unit tests for configuration parser.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import mock_open, patch + +import pytest +import yaml + +from mcp_fuzzer.config.loading.parser import load_config_file +from mcp_fuzzer.exceptions import ConfigFileError + + +def test_load_config_file_success(tmp_path): + """Test successful loading of YAML config file.""" + config_path = tmp_path / "test.yaml" + config_path.write_text("timeout: 30.0\nlog_level: DEBUG") + + result = load_config_file(str(config_path)) + assert result["timeout"] == 30.0 + assert result["log_level"] == "DEBUG" + + +def test_load_config_file_not_found(): + """Test that missing file raises ConfigFileError.""" + with pytest.raises(ConfigFileError, match="Configuration file not found"): + load_config_file("/nonexistent/path.yaml") + + +def test_load_config_file_invalid_extension(tmp_path): + """Test that non-YAML files raise ConfigFileError.""" + txt_file = tmp_path / "config.txt" + txt_file.write_text("timeout: 30") + with pytest.raises(ConfigFileError, match="Unsupported configuration file format"): + load_config_file(str(txt_file)) + + +def test_load_config_file_invalid_yaml(tmp_path): + """Test that invalid YAML raises ConfigFileError with proper message.""" + invalid_yaml = tmp_path / "invalid.yaml" + invalid_yaml.write_text("timeout: [1,") # Invalid YAML syntax + + with pytest.raises(ConfigFileError, match="Error parsing YAML"): + load_config_file(str(invalid_yaml)) + + +def test_load_config_file_permission_error(tmp_path): + """Test that permission errors raise ConfigFileError.""" + config_path = tmp_path / "test.yaml" + config_path.write_text("timeout: 30") + + with patch("builtins.open", side_effect=PermissionError("Access denied")): + with pytest.raises(ConfigFileError, match="Permission denied"): + load_config_file(str(config_path)) + + +def test_load_config_file_exception_formatting(): + """Test exceptions formatted without redundant str() conversion.""" + config_path = "/test/path.yaml" + + # Mock file exists check + with patch("os.path.isfile", return_value=True): + # Mock YAML parsing error + yaml_error = yaml.YAMLError("YAML syntax error") + with patch("builtins.open", mock_open(read_data="invalid: yaml: [")): + with patch("yaml.safe_load", side_effect=yaml_error): + with pytest.raises(ConfigFileError) as exc_info: + load_config_file(config_path) + # Exception message should contain the error without redundant str() + assert "YAML syntax error" in str(exc_info.value) + assert config_path in str(exc_info.value) + + +def test_load_config_file_empty_file(tmp_path): + """Test that empty file returns empty dict.""" + empty_file = tmp_path / "empty.yaml" + empty_file.write_text("") + + result = load_config_file(str(empty_file)) + assert result == {} + + +def test_load_config_file_none_yaml_result(tmp_path): + """Test that YAML file with only comments returns empty dict.""" + comment_file = tmp_path / "comments.yaml" + comment_file.write_text("# Just a comment\n# Another comment") + + result = load_config_file(str(comment_file)) + assert result == {} + diff --git a/tests/unit/config/test_schema_builders.py b/tests/unit/config/test_schema_builders.py new file mode 100644 index 0000000..feebd55 --- /dev/null +++ b/tests/unit/config/test_schema_builders.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +"""Unit tests for schema builder functions.""" + +from __future__ import annotations + +import pytest + +from mcp_fuzzer.config.schema.builders import ( + build_auth_schema, + build_basic_schema, + build_custom_transports_schema, + build_fuzzing_schema, + build_network_schema, + build_output_schema, + build_safety_schema, + build_timeout_schema, +) +from mcp_fuzzer.config.schema.composer import get_config_schema + + +def test_build_timeout_schema(): + """Test that timeout schema includes all timeout-related fields.""" + schema = build_timeout_schema() + assert "timeout" in schema + assert "tool_timeout" in schema + assert "http_timeout" in schema + assert "sse_timeout" in schema + assert "stdio_timeout" in schema + assert all(field["type"] == "number" for field in schema.values()) + + +def testbuild_basic_schema(): + """Test that basic schema includes log level, safety, and fs_root.""" + schema = build_basic_schema() + assert "log_level" in schema + assert "safety_enabled" in schema + assert "fs_root" in schema + assert schema["log_level"]["type"] == "string" + assert schema["safety_enabled"]["type"] == "boolean" + + +def testbuild_fuzzing_schema(): + """Test that fuzzing schema includes mode, phase, protocol, etc.""" + schema = build_fuzzing_schema() + assert "mode" in schema + assert "phase" in schema + assert "protocol" in schema + assert "endpoint" in schema + assert "runs" in schema + assert "runs_per_type" in schema + assert "max_concurrency" in schema + + +def testbuild_network_schema(): + """Test that network schema includes network-related fields.""" + schema = build_network_schema() + assert "no_network" in schema + assert "allow_hosts" in schema + assert schema["no_network"]["type"] == "boolean" + assert schema["allow_hosts"]["type"] == "array" + + +def testbuild_auth_schema(): + """Test that auth schema includes auth configuration.""" + schema = build_auth_schema() + assert "auth" in schema + auth_schema = schema["auth"] + assert auth_schema["type"] == "object" + assert "providers" in auth_schema["properties"] + assert "mappings" in auth_schema["properties"] + + +def testbuild_custom_transports_schema(): + """Test that custom transports schema is properly structured.""" + schema = build_custom_transports_schema() + assert "custom_transports" in schema + transport_schema = schema["custom_transports"] + assert transport_schema["type"] == "object" + assert "patternProperties" in transport_schema + assert "additionalProperties" in transport_schema + assert transport_schema["additionalProperties"] is False + + +def testbuild_safety_schema(): + """Test that safety schema includes all safety-related fields.""" + schema = build_safety_schema() + assert "safety" in schema + safety_schema = schema["safety"] + assert safety_schema["type"] == "object" + props = safety_schema["properties"] + assert "enabled" in props + assert "local_hosts" in props + assert "no_network" in props + assert "header_denylist" in props + + +def testbuild_output_schema(): + """Test that output schema includes output configuration.""" + schema = build_output_schema() + assert "output" in schema + output_schema = schema["output"] + assert output_schema["type"] == "object" + props = output_schema["properties"] + assert "format" in props + assert "directory" in props + assert "compress" in props + assert "types" in props + assert "retention" in props + + +def test_get_config_schema_composition(): + """Test that get_config_schema composes all builder functions.""" + schema = get_config_schema() + assert schema["type"] == "object" + assert "properties" in schema + + properties = schema["properties"] + # Check that all sections are included + assert "timeout" in properties + assert "log_level" in properties + assert "mode" in properties + assert "no_network" in properties + assert "auth" in properties + assert "custom_transports" in properties + assert "safety" in properties + assert "output" in properties + + +def test_get_config_schema_structure(): + """Test that the complete schema has proper JSON schema structure.""" + schema = get_config_schema() + assert isinstance(schema, dict) + assert "type" in schema + assert "properties" in schema + assert schema["type"] == "object" + assert isinstance(schema["properties"], dict) + + +def test_schema_builders_are_independent(): + """Test that schema builders can be called independently.""" + timeout = build_timeout_schema() + basic = build_basic_schema() + fuzzing = build_fuzzing_schema() + + # Should not have overlapping keys + assert not set(timeout.keys()) & set(basic.keys()) + assert not set(timeout.keys()) & set(fuzzing.keys()) + assert not set(basic.keys()) & set(fuzzing.keys()) + diff --git a/tests/unit/config/test_search_params.py b/tests/unit/config/test_search_params.py new file mode 100644 index 0000000..65c5721 --- /dev/null +++ b/tests/unit/config/test_search_params.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +"""Unit tests for ConfigSearchParams dataclass.""" + +from __future__ import annotations + +import pytest + +from mcp_fuzzer.config.loading.search_params import ConfigSearchParams + + +def test_config_search_params_defaults(): + """Test that ConfigSearchParams has sensible defaults.""" + params = ConfigSearchParams() + assert params.config_path is None + assert params.search_paths is None + assert params.file_names is None + + +def test_config_search_params_with_values(): + """Test ConfigSearchParams with provided values.""" + params = ConfigSearchParams( + config_path="/path/to/config.yaml", + search_paths=["/dir1", "/dir2"], + file_names=["config.yml", "config.yaml"], + ) + assert params.config_path == "/path/to/config.yaml" + assert params.search_paths == ["/dir1", "/dir2"] + assert params.file_names == ["config.yml", "config.yaml"] + + +def test_config_search_params_partial(): + """Test ConfigSearchParams with partial values.""" + params = ConfigSearchParams(config_path="/explicit/path.yaml") + assert params.config_path == "/explicit/path.yaml" + assert params.search_paths is None + assert params.file_names is None + + +def test_config_search_params_equality(): + """Test that ConfigSearchParams supports equality comparison.""" + params1 = ConfigSearchParams( + config_path="/path.yaml", + search_paths=["/dir1"], + file_names=["config.yaml"], + ) + params2 = ConfigSearchParams( + config_path="/path.yaml", + search_paths=["/dir1"], + file_names=["config.yaml"], + ) + params3 = ConfigSearchParams(config_path="/different.yaml") + + assert params1 == params2 + assert params1 != params3 + + +def test_config_search_params_repr(): + """Test that ConfigSearchParams has a useful string representation.""" + params = ConfigSearchParams(config_path="/test.yaml") + repr_str = repr(params) + assert "ConfigSearchParams" in repr_str + assert "/test.yaml" in repr_str + From 96481e9a974b83d53a6ba7bc13b7c9a1cdb64da4 Mon Sep 17 00:00:00 2001 From: Prince Roshan Date: Fri, 28 Nov 2025 03:12:24 +0530 Subject: [PATCH 3/3] resolve comments --- mcp_fuzzer/cli/config_merge.py | 10 +++++---- mcp_fuzzer/config/loading/loader.py | 4 +++- mcp_fuzzer/config/loading/parser.py | 22 +++++++++++++++----- mcp_fuzzer/config/schema/builders.py | 31 ++++++++++++++++++++++++---- tests/unit/cli/test_cli.py | 6 ++++-- 5 files changed, 57 insertions(+), 16 deletions(-) diff --git a/mcp_fuzzer/cli/config_merge.py b/mcp_fuzzer/cli/config_merge.py index 3589f65..1bc9d64 100644 --- a/mcp_fuzzer/cli/config_merge.py +++ b/mcp_fuzzer/cli/config_merge.py @@ -80,10 +80,12 @@ def build_cli_config(args: argparse.Namespace) -> CliConfig: f"Failed to load configuration file '{args.config}': {exc}" ) else: - try: - config_mediator.apply_file() - except Exception as exc: - logging.debug(f"Error loading default configuration file: {exc}") + # apply_file() returns False if config loading fails (doesn't raise) + if not config_mediator.apply_file(): + logging.debug( + "Default configuration file not found or failed to load " + "(this is normal if no config file exists)" + ) _transfer_config_to_args(args) auth_manager = resolve_auth_port(args) diff --git a/mcp_fuzzer/config/loading/loader.py b/mcp_fuzzer/config/loading/loader.py index b2df967..10cec89 100644 --- a/mcp_fuzzer/config/loading/loader.py +++ b/mcp_fuzzer/config/loading/loader.py @@ -76,7 +76,9 @@ def load( def load_from_params( self, params: ConfigSearchParams - ) -> Tuple[ConfigDict | None, str | None]: + ) -> Tuple[ + ConfigDict | None, str | None + ]: """Load configuration using ConfigSearchParams. Args: diff --git a/mcp_fuzzer/config/loading/parser.py b/mcp_fuzzer/config/loading/parser.py index bf45152..cb48ee0 100644 --- a/mcp_fuzzer/config/loading/parser.py +++ b/mcp_fuzzer/config/loading/parser.py @@ -34,16 +34,28 @@ def load_config_file(file_path: str) -> dict[str, Any]: try: with open(file_path, "r", encoding="utf-8") as f: - return yaml.safe_load(f) or {} + data = yaml.safe_load(f) or {} + + # Validate that top-level config is a mapping/object + if not isinstance(data, dict): + raise ConfigFileError( + f"Top-level configuration in {file_path} must be a mapping/object, " + f"got {type(data).__name__}" + ) + + return data except yaml.YAMLError as e: raise ConfigFileError( f"Error parsing YAML configuration file {file_path}: {e}" - ) - except PermissionError: + ) from e + except PermissionError as e: raise ConfigFileError( f"Permission denied when reading configuration file: {file_path}" - ) + ) from e + except ConfigFileError: + # Re-raise ConfigFileError as-is (already has proper context) + raise except Exception as e: raise ConfigFileError( f"Unexpected error reading configuration file {file_path}: {e}" - ) + ) from e diff --git a/mcp_fuzzer/config/schema/builders.py b/mcp_fuzzer/config/schema/builders.py index b4b41f0..ce780e7 100644 --- a/mcp_fuzzer/config/schema/builders.py +++ b/mcp_fuzzer/config/schema/builders.py @@ -30,7 +30,12 @@ def build_timeout_schema() -> dict[str, Any]: def build_basic_schema() -> dict[str, Any]: - """Build schema for basic configuration properties.""" + """Build schema for basic configuration properties. + + Note: `safety_enabled` is a top-level convenience flag. If both + `safety_enabled` and `safety.enabled` are specified, `safety.enabled` + takes precedence. + """ return { "log_level": { "type": "string", @@ -38,7 +43,11 @@ def build_basic_schema() -> dict[str, Any]: }, "safety_enabled": { "type": "boolean", - "description": "Whether safety features are enabled", + "description": ( + "Whether safety features are enabled (convenience flag). " + "If both safety_enabled and safety.enabled are specified, " + "safety.enabled takes precedence." + ), }, "fs_root": { "type": "string", @@ -157,12 +166,23 @@ def build_custom_transports_schema() -> dict[str, Any]: def build_safety_schema() -> dict[str, Any]: - """Build schema for safety configuration.""" + """Build schema for safety configuration. + + Note: `safety.enabled` takes precedence over top-level `safety_enabled` + if both are specified. + """ return { "safety": { "type": "object", "properties": { - "enabled": {"type": "boolean"}, + "enabled": { + "type": "boolean", + "description": ( + "Whether safety features are enabled. " + "Takes precedence over top-level safety_enabled " + "if both are specified." + ), + }, "local_hosts": {"type": "array", "items": {"type": "string"}}, "no_network": {"type": "boolean"}, "header_denylist": {"type": "array", "items": {"type": "string"}}, @@ -172,6 +192,7 @@ def build_safety_schema() -> dict[str, Any]: }, "env_allowlist": {"type": "array", "items": {"type": "string"}}, }, + "additionalProperties": False, }, } @@ -228,8 +249,10 @@ def build_output_schema() -> dict[str, Any]: ), }, }, + "additionalProperties": False, }, }, + "additionalProperties": False, }, } diff --git a/tests/unit/cli/test_cli.py b/tests/unit/cli/test_cli.py index 226b2ea..a0ce7e3 100644 --- a/tests/unit/cli/test_cli.py +++ b/tests/unit/cli/test_cli.py @@ -560,13 +560,15 @@ def test_build_cli_config_uses_config_file(monkeypatch): def test_build_cli_config_handles_apply_config_error(caplog): caplog.set_level(logging.DEBUG) args = _base_args(config=None) + # apply_file() now returns False instead of raising exceptions with patch( "mcp_fuzzer.cli.config_merge.config_mediator.apply_file", - side_effect=Exception("fail"), + return_value=False, ): cli_config = build_cli_config(args) assert cli_config.merged["endpoint"] == "http://localhost" - assert "fail" in "".join(caplog.messages) + # Check that debug message was logged when config file is not found + assert "Default configuration file not found" in "".join(caplog.messages) def test_build_cli_config_raises_config_error(): args = _base_args(config="bad.yml")