diff --git a/.gitignore b/.gitignore index 78ec16a..c68d2ed 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,7 @@ doc_ref/ # C extensions *REDESIGN*.md *FLOW.md +*USAGE.md *.so notes.md /reports/ diff --git a/mcp_fuzzer/cli/config_merge.py b/mcp_fuzzer/cli/config_merge.py index a0abf82..1bc9d64 100644 --- a/mcp_fuzzer/cli/config_merge.py +++ b/mcp_fuzzer/cli/config_merge.py @@ -7,8 +7,8 @@ import logging from typing import Any -from ..config import config as global_config, load_config_file, apply_config_file from ..exceptions import ConfigFileError +from ..client.adapters import config_mediator from ..client.settings import CliConfig from ..client.transport.auth_port import resolve_auth_port from .parser import create_argument_parser @@ -60,7 +60,7 @@ def _transfer_config_to_args(args: argparse.Namespace) -> None: ] for config_key, args_key in mapping: - config_value = global_config.get(config_key) + config_value = config_mediator.get(config_key) default_value = defaults_parser.get_default(args_key) if default_value is argparse.SUPPRESS: # pragma: no cover default_value = None @@ -73,17 +73,19 @@ def build_cli_config(args: argparse.Namespace) -> CliConfig: """Merge CLI args, config files, and resolved auth.""" if args.config: try: - loaded = load_config_file(args.config) - global_config.update(loaded) + loaded = config_mediator.load_file(args.config) + config_mediator.update(loaded) except Exception as exc: raise ConfigFileError( f"Failed to load configuration file '{args.config}': {exc}" ) else: - try: - apply_config_file() - except Exception as exc: - logging.debug(f"Error loading default configuration file: {exc}") + # apply_file() returns False if config loading fails (doesn't raise) + if not config_mediator.apply_file(): + logging.debug( + "Default configuration file not found or failed to load " + "(this is normal if no config file exists)" + ) _transfer_config_to_args(args) auth_manager = resolve_auth_port(args) @@ -135,7 +137,7 @@ def build_cli_config(args: argparse.Namespace) -> CliConfig: "auth_manager": auth_manager, } - global_config.update(merged) + config_mediator.update(merged) return CliConfig(args=args, merged=merged) diff --git a/mcp_fuzzer/cli/startup_info.py b/mcp_fuzzer/cli/startup_info.py index 03a6550..c513817 100644 --- a/mcp_fuzzer/cli/startup_info.py +++ b/mcp_fuzzer/cli/startup_info.py @@ -26,8 +26,8 @@ def print_startup_info(args: argparse.Namespace, config: dict | None = None) -> try: # Load and display the config file content import json - from ..config import load_config_file - raw_config = load_config_file(args.config) + from ..client.adapters import config_mediator + raw_config = config_mediator.load_file(args.config) config_json = json.dumps(raw_config, indent=2, sort_keys=True) console.print(f"[dim]{config_json}[/dim]") console.print() diff --git a/mcp_fuzzer/cli/validators.py b/mcp_fuzzer/cli/validators.py index e581a4f..3a1d921 100644 --- a/mcp_fuzzer/cli/validators.py +++ b/mcp_fuzzer/cli/validators.py @@ -11,7 +11,7 @@ from rich.console import Console from ..exceptions import ArgumentValidationError -from ..config import load_config_file +from ..client.adapters import config_mediator from ..transport.factory import create_transport from ..exceptions import MCPError, TransportError from ..env import ENVIRONMENT_VARIABLES, ValidationType @@ -65,7 +65,7 @@ def validate_arguments(self, args: argparse.Namespace) -> None: def validate_config_file(self, path: str) -> None: """Validate a config file and print success message.""" - load_config_file(path) + config_mediator.load_file(path) success_msg = ( "[green]:heavy_check_mark: Configuration file " f"'{path}' is valid[/green]" diff --git a/mcp_fuzzer/client/__init__.py b/mcp_fuzzer/client/__init__.py index d4b9305..f50701b 100644 --- a/mcp_fuzzer/client/__init__.py +++ b/mcp_fuzzer/client/__init__.py @@ -1,7 +1,72 @@ """Public client exports.""" from .base import MCPFuzzerClient +from .adapters import ConfigAdapter, config_mediator +from .constants import ( + CONTENT_TYPE_HEADER, + DEFAULT_FORCE_KILL_TIMEOUT, + DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, + DEFAULT_HTTP_ACCEPT, + DEFAULT_MAX_TOTAL_FUZZING_TIME, + DEFAULT_MAX_TOOL_TIME, + DEFAULT_PROTOCOL_RUNS_PER_TYPE, + DEFAULT_PROTOCOL_VERSION, + DEFAULT_TIMEOUT, + DEFAULT_TOOL_RUNS, + DEFAULT_TOOL_TIMEOUT, + JSON_CONTENT_TYPE, + MCP_PROTOCOL_VERSION_HEADER, + MCP_SESSION_ID_HEADER, + PROCESS_CLEANUP_TIMEOUT, + PROCESS_FORCE_KILL_TIMEOUT, + PROCESS_TERMINATION_TIMEOUT, + PROCESS_WAIT_TIMEOUT, + SAFETY_ENV_ALLOWLIST, + SAFETY_HEADER_DENYLIST, + SAFETY_LOCAL_HOSTS, + SAFETY_NO_NETWORK_DEFAULT, + SAFETY_PROXY_ENV_DENYLIST, + SSE_CONTENT_TYPE, + WATCHDOG_DEFAULT_CHECK_INTERVAL, + WATCHDOG_EXTRA_BUFFER, + WATCHDOG_MAX_HANG_ADDITIONAL, +) +from .ports import ConfigPort UnifiedMCPFuzzerClient = MCPFuzzerClient -__all__ = ["MCPFuzzerClient", "UnifiedMCPFuzzerClient"] +__all__ = [ + "MCPFuzzerClient", + "UnifiedMCPFuzzerClient", + "ConfigPort", + "ConfigAdapter", + "config_mediator", + # Constants + "DEFAULT_PROTOCOL_VERSION", + "CONTENT_TYPE_HEADER", + "JSON_CONTENT_TYPE", + "SSE_CONTENT_TYPE", + "DEFAULT_HTTP_ACCEPT", + "MCP_SESSION_ID_HEADER", + "MCP_PROTOCOL_VERSION_HEADER", + "WATCHDOG_DEFAULT_CHECK_INTERVAL", + "WATCHDOG_EXTRA_BUFFER", + "WATCHDOG_MAX_HANG_ADDITIONAL", + "SAFETY_LOCAL_HOSTS", + "SAFETY_NO_NETWORK_DEFAULT", + "SAFETY_HEADER_DENYLIST", + "SAFETY_PROXY_ENV_DENYLIST", + "SAFETY_ENV_ALLOWLIST", + "DEFAULT_TOOL_RUNS", + "DEFAULT_PROTOCOL_RUNS_PER_TYPE", + "DEFAULT_TIMEOUT", + "DEFAULT_TOOL_TIMEOUT", + "DEFAULT_MAX_TOOL_TIME", + "DEFAULT_MAX_TOTAL_FUZZING_TIME", + "DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT", + "DEFAULT_FORCE_KILL_TIMEOUT", + "PROCESS_TERMINATION_TIMEOUT", + "PROCESS_FORCE_KILL_TIMEOUT", + "PROCESS_CLEANUP_TIMEOUT", + "PROCESS_WAIT_TIMEOUT", +] diff --git a/mcp_fuzzer/client/adapters/__init__.py b/mcp_fuzzer/client/adapters/__init__.py new file mode 100644 index 0000000..6256e32 --- /dev/null +++ b/mcp_fuzzer/client/adapters/__init__.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 +"""Adapter implementations for Port and Adapter pattern. + +Adapters implement the ports (interfaces) by adapting external modules. +This is where the mediation between modules happens. +""" + +from .config_adapter import ConfigAdapter, config_mediator + +__all__ = ["ConfigAdapter", "config_mediator"] + diff --git a/mcp_fuzzer/client/adapters/config_adapter.py b/mcp_fuzzer/client/adapters/config_adapter.py new file mode 100644 index 0000000..cba8f83 --- /dev/null +++ b/mcp_fuzzer/client/adapters/config_adapter.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +"""Configuration adapter implementation for Port and Adapter pattern. + +This module implements the ConfigPort interface by adapting the config module. +This is the adapter that mediates all configuration access. +""" + +from __future__ import annotations + +from typing import Any + +from ...config import ( + apply_config_file, + get_config_schema, + load_config_file, +) +from ...config.core.manager import config as global_config +from ..ports.config_port import ConfigPort + + +class ConfigAdapter(ConfigPort): + """Adapter that implements ConfigPort by delegating to the config module. + + This adapter acts as a mediator between other modules and the config module, + implementing the Port and Adapter (Hexagonal Architecture) pattern. + """ + + def __init__(self, config_instance: Any = None): + """Initialize the config adapter. + + Args: + config_instance: Optional configuration instance to use. + If None, uses the global config instance. + """ + self._config = config_instance or global_config + + def get(self, key: str, default: Any = None) -> Any: + """Get a configuration value by key. + + Args: + key: Configuration key + default: Default value if key not found + + Returns: + Configuration value or default + """ + return self._config.get(key, default) + + def set(self, key: str, value: Any) -> None: + """Set a configuration value. + + Args: + key: Configuration key + value: Configuration value + """ + self._config.set(key, value) + + def update(self, config_dict: dict[str, Any]) -> None: + """Update configuration with values from a dictionary. + + Args: + config_dict: Dictionary of configuration values to update + """ + self._config.update(config_dict) + + def load_file(self, file_path: str) -> dict[str, Any]: + """Load configuration from a file. + + Args: + file_path: Path to configuration file + + Returns: + Dictionary containing loaded configuration + + Raises: + ConfigFileError: If file cannot be loaded + """ + return load_config_file(file_path) + + def apply_file( + self, + config_path: str | None = None, + search_paths: list[str] | None = None, + file_names: list[str] | None = None, + ) -> bool: + """Load and apply configuration from a file. + + Args: + config_path: Explicit path to config file + search_paths: List of directories to search + file_names: List of file names to search for + + Returns: + True if configuration was loaded and applied, False otherwise + """ + return apply_config_file( + config_path=config_path, + search_paths=search_paths, + file_names=file_names, + ) + + def get_schema(self) -> dict[str, Any]: + """Get the JSON schema for configuration validation. + + Returns: + JSON schema dictionary + """ + return get_config_schema() + + +# Global instance for convenience (acts as the mediator) +config_mediator: ConfigPort = ConfigAdapter() + diff --git a/mcp_fuzzer/client/constants.py b/mcp_fuzzer/client/constants.py new file mode 100644 index 0000000..4e08737 --- /dev/null +++ b/mcp_fuzzer/client/constants.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +"""Configuration constants exposed through client module. + +This module re-exports all configuration constants so other modules +can access them through the client mediator instead of directly +from the config module. + +Uses lazy imports to avoid circular dependencies. +""" + +# Lazy import to avoid circular dependencies +# Import happens at module level but after client module is initialized +def _get_constants(): + """Lazy import of constants to break circular dependencies.""" + from mcp_fuzzer.config.core.constants import ( + CONTENT_TYPE_HEADER, + DEFAULT_FORCE_KILL_TIMEOUT, + DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, + DEFAULT_HTTP_ACCEPT, + DEFAULT_MAX_TOTAL_FUZZING_TIME, + DEFAULT_MAX_TOOL_TIME, + DEFAULT_PROTOCOL_RUNS_PER_TYPE, + DEFAULT_PROTOCOL_VERSION, + DEFAULT_TIMEOUT, + DEFAULT_TOOL_RUNS, + DEFAULT_TOOL_TIMEOUT, + JSON_CONTENT_TYPE, + MCP_PROTOCOL_VERSION_HEADER, + MCP_SESSION_ID_HEADER, + PROCESS_CLEANUP_TIMEOUT, + PROCESS_FORCE_KILL_TIMEOUT, + PROCESS_TERMINATION_TIMEOUT, + PROCESS_WAIT_TIMEOUT, + SAFETY_ENV_ALLOWLIST, + SAFETY_HEADER_DENYLIST, + SAFETY_LOCAL_HOSTS, + SAFETY_NO_NETWORK_DEFAULT, + SAFETY_PROXY_ENV_DENYLIST, + SSE_CONTENT_TYPE, + WATCHDOG_DEFAULT_CHECK_INTERVAL, + WATCHDOG_EXTRA_BUFFER, + WATCHDOG_MAX_HANG_ADDITIONAL, + ) + return { + "CONTENT_TYPE_HEADER": CONTENT_TYPE_HEADER, + "DEFAULT_FORCE_KILL_TIMEOUT": DEFAULT_FORCE_KILL_TIMEOUT, + "DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT": DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, + "DEFAULT_HTTP_ACCEPT": DEFAULT_HTTP_ACCEPT, + "DEFAULT_MAX_TOTAL_FUZZING_TIME": DEFAULT_MAX_TOTAL_FUZZING_TIME, + "DEFAULT_MAX_TOOL_TIME": DEFAULT_MAX_TOOL_TIME, + "DEFAULT_PROTOCOL_RUNS_PER_TYPE": DEFAULT_PROTOCOL_RUNS_PER_TYPE, + "DEFAULT_PROTOCOL_VERSION": DEFAULT_PROTOCOL_VERSION, + "DEFAULT_TIMEOUT": DEFAULT_TIMEOUT, + "DEFAULT_TOOL_RUNS": DEFAULT_TOOL_RUNS, + "DEFAULT_TOOL_TIMEOUT": DEFAULT_TOOL_TIMEOUT, + "JSON_CONTENT_TYPE": JSON_CONTENT_TYPE, + "MCP_PROTOCOL_VERSION_HEADER": MCP_PROTOCOL_VERSION_HEADER, + "MCP_SESSION_ID_HEADER": MCP_SESSION_ID_HEADER, + "PROCESS_CLEANUP_TIMEOUT": PROCESS_CLEANUP_TIMEOUT, + "PROCESS_FORCE_KILL_TIMEOUT": PROCESS_FORCE_KILL_TIMEOUT, + "PROCESS_TERMINATION_TIMEOUT": PROCESS_TERMINATION_TIMEOUT, + "PROCESS_WAIT_TIMEOUT": PROCESS_WAIT_TIMEOUT, + "SAFETY_ENV_ALLOWLIST": SAFETY_ENV_ALLOWLIST, + "SAFETY_HEADER_DENYLIST": SAFETY_HEADER_DENYLIST, + "SAFETY_LOCAL_HOSTS": SAFETY_LOCAL_HOSTS, + "SAFETY_NO_NETWORK_DEFAULT": SAFETY_NO_NETWORK_DEFAULT, + "SAFETY_PROXY_ENV_DENYLIST": SAFETY_PROXY_ENV_DENYLIST, + "SSE_CONTENT_TYPE": SSE_CONTENT_TYPE, + "WATCHDOG_DEFAULT_CHECK_INTERVAL": WATCHDOG_DEFAULT_CHECK_INTERVAL, + "WATCHDOG_EXTRA_BUFFER": WATCHDOG_EXTRA_BUFFER, + "WATCHDOG_MAX_HANG_ADDITIONAL": WATCHDOG_MAX_HANG_ADDITIONAL, + } + +# Import constants lazily +_constants = _get_constants() + +# Export all constants +CONTENT_TYPE_HEADER = _constants["CONTENT_TYPE_HEADER"] +DEFAULT_FORCE_KILL_TIMEOUT = _constants["DEFAULT_FORCE_KILL_TIMEOUT"] +DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = _constants["DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT"] +DEFAULT_HTTP_ACCEPT = _constants["DEFAULT_HTTP_ACCEPT"] +DEFAULT_MAX_TOTAL_FUZZING_TIME = _constants["DEFAULT_MAX_TOTAL_FUZZING_TIME"] +DEFAULT_MAX_TOOL_TIME = _constants["DEFAULT_MAX_TOOL_TIME"] +DEFAULT_PROTOCOL_RUNS_PER_TYPE = _constants["DEFAULT_PROTOCOL_RUNS_PER_TYPE"] +DEFAULT_PROTOCOL_VERSION = _constants["DEFAULT_PROTOCOL_VERSION"] +DEFAULT_TIMEOUT = _constants["DEFAULT_TIMEOUT"] +DEFAULT_TOOL_RUNS = _constants["DEFAULT_TOOL_RUNS"] +DEFAULT_TOOL_TIMEOUT = _constants["DEFAULT_TOOL_TIMEOUT"] +JSON_CONTENT_TYPE = _constants["JSON_CONTENT_TYPE"] +MCP_PROTOCOL_VERSION_HEADER = _constants["MCP_PROTOCOL_VERSION_HEADER"] +MCP_SESSION_ID_HEADER = _constants["MCP_SESSION_ID_HEADER"] +PROCESS_CLEANUP_TIMEOUT = _constants["PROCESS_CLEANUP_TIMEOUT"] +PROCESS_FORCE_KILL_TIMEOUT = _constants["PROCESS_FORCE_KILL_TIMEOUT"] +PROCESS_TERMINATION_TIMEOUT = _constants["PROCESS_TERMINATION_TIMEOUT"] +PROCESS_WAIT_TIMEOUT = _constants["PROCESS_WAIT_TIMEOUT"] +SAFETY_ENV_ALLOWLIST = _constants["SAFETY_ENV_ALLOWLIST"] +SAFETY_HEADER_DENYLIST = _constants["SAFETY_HEADER_DENYLIST"] +SAFETY_LOCAL_HOSTS = _constants["SAFETY_LOCAL_HOSTS"] +SAFETY_NO_NETWORK_DEFAULT = _constants["SAFETY_NO_NETWORK_DEFAULT"] +SAFETY_PROXY_ENV_DENYLIST = _constants["SAFETY_PROXY_ENV_DENYLIST"] +SSE_CONTENT_TYPE = _constants["SSE_CONTENT_TYPE"] +WATCHDOG_DEFAULT_CHECK_INTERVAL = _constants["WATCHDOG_DEFAULT_CHECK_INTERVAL"] +WATCHDOG_EXTRA_BUFFER = _constants["WATCHDOG_EXTRA_BUFFER"] +WATCHDOG_MAX_HANG_ADDITIONAL = _constants["WATCHDOG_MAX_HANG_ADDITIONAL"] + +__all__ = [ + "DEFAULT_PROTOCOL_VERSION", + "CONTENT_TYPE_HEADER", + "JSON_CONTENT_TYPE", + "SSE_CONTENT_TYPE", + "DEFAULT_HTTP_ACCEPT", + "MCP_SESSION_ID_HEADER", + "MCP_PROTOCOL_VERSION_HEADER", + "WATCHDOG_DEFAULT_CHECK_INTERVAL", + "WATCHDOG_EXTRA_BUFFER", + "WATCHDOG_MAX_HANG_ADDITIONAL", + "SAFETY_LOCAL_HOSTS", + "SAFETY_NO_NETWORK_DEFAULT", + "SAFETY_HEADER_DENYLIST", + "SAFETY_PROXY_ENV_DENYLIST", + "SAFETY_ENV_ALLOWLIST", + "DEFAULT_TOOL_RUNS", + "DEFAULT_PROTOCOL_RUNS_PER_TYPE", + "DEFAULT_TIMEOUT", + "DEFAULT_TOOL_TIMEOUT", + "DEFAULT_MAX_TOOL_TIME", + "DEFAULT_MAX_TOTAL_FUZZING_TIME", + "DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT", + "DEFAULT_FORCE_KILL_TIMEOUT", + "PROCESS_TERMINATION_TIMEOUT", + "PROCESS_FORCE_KILL_TIMEOUT", + "PROCESS_CLEANUP_TIMEOUT", + "PROCESS_WAIT_TIMEOUT", +] + diff --git a/mcp_fuzzer/client/ports/__init__.py b/mcp_fuzzer/client/ports/__init__.py new file mode 100644 index 0000000..c1684fc --- /dev/null +++ b/mcp_fuzzer/client/ports/__init__.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 +"""Port interfaces for Port and Adapter pattern. + +Ports define the contracts (interfaces) that adapters must implement. +All modules should depend on ports, not concrete implementations. +""" + +from .config_port import ConfigPort + +__all__ = ["ConfigPort"] + diff --git a/mcp_fuzzer/client/ports/config_port.py b/mcp_fuzzer/client/ports/config_port.py new file mode 100644 index 0000000..53d5f5b --- /dev/null +++ b/mcp_fuzzer/client/ports/config_port.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +"""Configuration port interface for Port and Adapter pattern. + +This module defines the port (interface) for configuration access. +All modules should interact with configuration through this port, +not directly with the config module. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any + + +class ConfigPort(ABC): + """Port interface for configuration access. + + This defines the contract that all configuration adapters must implement. + Modules should depend on this interface, not concrete implementations. + """ + + @abstractmethod + def get(self, key: str, default: Any = None) -> Any: + """Get a configuration value by key. + + Args: + key: Configuration key + default: Default value if key not found + + Returns: + Configuration value or default + """ + pass + + @abstractmethod + def set(self, key: str, value: Any) -> None: + """Set a configuration value. + + Args: + key: Configuration key + value: Configuration value + """ + pass + + @abstractmethod + def update(self, config_dict: dict[str, Any]) -> None: + """Update configuration with values from a dictionary. + + Args: + config_dict: Dictionary of configuration values to update + """ + pass + + @abstractmethod + def load_file(self, file_path: str) -> dict[str, Any]: + """Load configuration from a file. + + Args: + file_path: Path to configuration file + + Returns: + Dictionary containing loaded configuration + + Raises: + ConfigFileError: If file cannot be loaded + """ + pass + + @abstractmethod + def apply_file( + self, + config_path: str | None = None, + search_paths: list[str] | None = None, + file_names: list[str] | None = None, + ) -> bool: + """Load and apply configuration from a file. + + Args: + config_path: Explicit path to config file + search_paths: List of directories to search + file_names: List of file names to search for + + Returns: + True if configuration was loaded and applied, False otherwise + """ + pass + + @abstractmethod + def get_schema(self) -> dict[str, Any]: + """Get the JSON schema for configuration validation. + + Returns: + JSON schema dictionary + """ + pass + diff --git a/mcp_fuzzer/client/tool_client.py b/mcp_fuzzer/client/tool_client.py index 9dd6786..eae45a3 100644 --- a/mcp_fuzzer/client/tool_client.py +++ b/mcp_fuzzer/client/tool_client.py @@ -12,7 +12,8 @@ from ..auth import AuthManager from ..fuzz_engine.fuzzer import ToolFuzzer from ..safety_system.safety import SafetyFilter, SafetyProvider -from ..config import ( +# Import constants directly from config (constants are values, not behavior) +from ..config.core.constants import ( DEFAULT_TOOL_RUNS, DEFAULT_MAX_TOOL_TIME, DEFAULT_MAX_TOTAL_FUZZING_TIME, diff --git a/mcp_fuzzer/config/__init__.py b/mcp_fuzzer/config/__init__.py index 39b8f1c..1ad00b8 100644 --- a/mcp_fuzzer/config/__init__.py +++ b/mcp_fuzzer/config/__init__.py @@ -1,45 +1,53 @@ #!/usr/bin/env python3 """Configuration module for MCP Fuzzer.""" -# Import all constants explicitly -from .constants import ( - DEFAULT_PROTOCOL_VERSION, +# Import all constants and core functionality +from .core import ( CONTENT_TYPE_HEADER, - JSON_CONTENT_TYPE, - SSE_CONTENT_TYPE, + DEFAULT_FORCE_KILL_TIMEOUT, + DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_HTTP_ACCEPT, - MCP_SESSION_ID_HEADER, + DEFAULT_MAX_TOTAL_FUZZING_TIME, + DEFAULT_MAX_TOOL_TIME, + DEFAULT_PROTOCOL_RUNS_PER_TYPE, + DEFAULT_PROTOCOL_VERSION, + DEFAULT_TIMEOUT, + DEFAULT_TOOL_RUNS, + DEFAULT_TOOL_TIMEOUT, + JSON_CONTENT_TYPE, MCP_PROTOCOL_VERSION_HEADER, - WATCHDOG_DEFAULT_CHECK_INTERVAL, - WATCHDOG_EXTRA_BUFFER, - WATCHDOG_MAX_HANG_ADDITIONAL, + MCP_SESSION_ID_HEADER, + PROCESS_CLEANUP_TIMEOUT, + PROCESS_FORCE_KILL_TIMEOUT, + PROCESS_TERMINATION_TIMEOUT, + PROCESS_WAIT_TIMEOUT, + SAFETY_ENV_ALLOWLIST, + SAFETY_HEADER_DENYLIST, SAFETY_LOCAL_HOSTS, SAFETY_NO_NETWORK_DEFAULT, - SAFETY_HEADER_DENYLIST, SAFETY_PROXY_ENV_DENYLIST, - SAFETY_ENV_ALLOWLIST, - DEFAULT_TOOL_RUNS, - DEFAULT_PROTOCOL_RUNS_PER_TYPE, - DEFAULT_TIMEOUT, - DEFAULT_TOOL_TIMEOUT, - DEFAULT_MAX_TOOL_TIME, - DEFAULT_MAX_TOTAL_FUZZING_TIME, - DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, - DEFAULT_FORCE_KILL_TIMEOUT, + SSE_CONTENT_TYPE, + WATCHDOG_DEFAULT_CHECK_INTERVAL, + WATCHDOG_EXTRA_BUFFER, + WATCHDOG_MAX_HANG_ADDITIONAL, + config, ) -# Import configuration manager and global instance -from .manager import config - # Import loader functions -from .loader import ( +from .loading import ( + ConfigLoader, + ConfigSearchParams, + apply_config_file, find_config_file, load_config_file, - apply_config_file, - get_config_schema, - load_custom_transports, ) +# Import schema +from .schema import get_config_schema + +# Import extensions +from .extensions import load_custom_transports + __all__ = [ # Constants "DEFAULT_PROTOCOL_VERSION", @@ -65,12 +73,19 @@ "DEFAULT_MAX_TOTAL_FUZZING_TIME", "DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT", "DEFAULT_FORCE_KILL_TIMEOUT", + "PROCESS_TERMINATION_TIMEOUT", + "PROCESS_FORCE_KILL_TIMEOUT", + "PROCESS_CLEANUP_TIMEOUT", + "PROCESS_WAIT_TIMEOUT", # Manager "config", - # Loader functions + # Loader helpers + "ConfigLoader", "find_config_file", "load_config_file", "apply_config_file", "get_config_schema", "load_custom_transports", -] \ No newline at end of file + # Search parameters + "ConfigSearchParams", +] diff --git a/mcp_fuzzer/config/core/__init__.py b/mcp_fuzzer/config/core/__init__.py new file mode 100644 index 0000000..b2703a4 --- /dev/null +++ b/mcp_fuzzer/config/core/__init__.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +"""Core configuration management.""" + +from .constants import ( + CONTENT_TYPE_HEADER, + DEFAULT_FORCE_KILL_TIMEOUT, + DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, + DEFAULT_HTTP_ACCEPT, + DEFAULT_MAX_TOTAL_FUZZING_TIME, + DEFAULT_MAX_TOOL_TIME, + DEFAULT_PROTOCOL_RUNS_PER_TYPE, + DEFAULT_PROTOCOL_VERSION, + DEFAULT_TIMEOUT, + DEFAULT_TOOL_RUNS, + DEFAULT_TOOL_TIMEOUT, + JSON_CONTENT_TYPE, + MCP_PROTOCOL_VERSION_HEADER, + MCP_SESSION_ID_HEADER, + PROCESS_CLEANUP_TIMEOUT, + PROCESS_FORCE_KILL_TIMEOUT, + PROCESS_TERMINATION_TIMEOUT, + PROCESS_WAIT_TIMEOUT, + SAFETY_ENV_ALLOWLIST, + SAFETY_HEADER_DENYLIST, + SAFETY_LOCAL_HOSTS, + SAFETY_NO_NETWORK_DEFAULT, + SAFETY_PROXY_ENV_DENYLIST, + SSE_CONTENT_TYPE, + WATCHDOG_DEFAULT_CHECK_INTERVAL, + WATCHDOG_EXTRA_BUFFER, + WATCHDOG_MAX_HANG_ADDITIONAL, +) +from .manager import Configuration, config + +__all__ = [ + # Constants + "DEFAULT_PROTOCOL_VERSION", + "CONTENT_TYPE_HEADER", + "JSON_CONTENT_TYPE", + "SSE_CONTENT_TYPE", + "DEFAULT_HTTP_ACCEPT", + "MCP_SESSION_ID_HEADER", + "MCP_PROTOCOL_VERSION_HEADER", + "WATCHDOG_DEFAULT_CHECK_INTERVAL", + "WATCHDOG_EXTRA_BUFFER", + "WATCHDOG_MAX_HANG_ADDITIONAL", + "SAFETY_LOCAL_HOSTS", + "SAFETY_NO_NETWORK_DEFAULT", + "SAFETY_HEADER_DENYLIST", + "SAFETY_PROXY_ENV_DENYLIST", + "SAFETY_ENV_ALLOWLIST", + "DEFAULT_TOOL_RUNS", + "DEFAULT_PROTOCOL_RUNS_PER_TYPE", + "DEFAULT_TIMEOUT", + "DEFAULT_TOOL_TIMEOUT", + "DEFAULT_MAX_TOOL_TIME", + "DEFAULT_MAX_TOTAL_FUZZING_TIME", + "DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT", + "DEFAULT_FORCE_KILL_TIMEOUT", + "PROCESS_TERMINATION_TIMEOUT", + "PROCESS_FORCE_KILL_TIMEOUT", + "PROCESS_CLEANUP_TIMEOUT", + "PROCESS_WAIT_TIMEOUT", + # Manager + "Configuration", + "config", +] + diff --git a/mcp_fuzzer/config/constants.py b/mcp_fuzzer/config/core/constants.py similarity index 96% rename from mcp_fuzzer/config/constants.py rename to mcp_fuzzer/config/core/constants.py index 9677b7a..2851d26 100644 --- a/mcp_fuzzer/config/constants.py +++ b/mcp_fuzzer/config/core/constants.py @@ -23,7 +23,7 @@ # Safety defaults # Hosts allowed for network operations by default. Keep local-only. SAFETY_LOCAL_HOSTS: set[str] = {"localhost", "127.0.0.1", "::1"} -# Default to deny network to non-local hosts +# Default to allow network access (set to True to deny by default) SAFETY_NO_NETWORK_DEFAULT: bool = False # Headers that should never be forwarded by default to avoid leakage SAFETY_HEADER_DENYLIST: set[str] = {"authorization", "cookie"} diff --git a/mcp_fuzzer/config/core/manager.py b/mcp_fuzzer/config/core/manager.py new file mode 100644 index 0000000..e725614 --- /dev/null +++ b/mcp_fuzzer/config/core/manager.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +"""Configuration management for MCP Fuzzer.""" + +import os +from typing import Any + + +def _get_float_from_env(key: str, default: float) -> float: + """Get a float value from environment variable. + + Args: + key: Environment variable name + default: Default value if not set or invalid + + Returns: + Float value from environment or default + """ + val = os.getenv(key) + if val is None: + return default + try: + return float(val) + except (TypeError, ValueError): + return default + + +def _get_bool_from_env(key: str, default: bool = False) -> bool: + """Get a boolean value from environment variable. + + Args: + key: Environment variable name + default: Default value if not set + + Returns: + Boolean value from environment or default + """ + val = os.getenv(key) + if val is None: + return default + return val.strip().lower() in {"1", "true", "yes", "on"} + + +class Configuration: + """Centralized configuration management for MCP Fuzzer.""" + + def __init__(self): + self._config: dict[str, Any] = {} + self._load_from_env() + + def _load_from_env(self) -> None: + """Load configuration values from environment variables.""" + self._config["timeout"] = _get_float_from_env("MCP_FUZZER_TIMEOUT", 30.0) + self._config["log_level"] = os.getenv("MCP_FUZZER_LOG_LEVEL", "INFO") + self._config["safety_enabled"] = _get_bool_from_env( + "MCP_FUZZER_SAFETY_ENABLED", False + ) + self._config["fs_root"] = os.getenv( + "MCP_FUZZER_FS_ROOT", os.path.expanduser("~/.mcp_fuzzer") + ) + self._config["http_timeout"] = _get_float_from_env( + "MCP_FUZZER_HTTP_TIMEOUT", 30.0 + ) + self._config["sse_timeout"] = _get_float_from_env( + "MCP_FUZZER_SSE_TIMEOUT", 30.0 + ) + self._config["stdio_timeout"] = _get_float_from_env( + "MCP_FUZZER_STDIO_TIMEOUT", 30.0 + ) + + def get(self, key: str, default: Any = None) -> Any: + """Get a configuration value by key.""" + return self._config.get(key, default) + + def set(self, key: str, value: Any) -> None: + """Set a configuration value.""" + self._config[key] = value + + def update(self, config_dict: dict[str, Any]) -> None: + """Update configuration with values from a dictionary.""" + self._config.update(config_dict) + +# Global configuration instance +config = Configuration() \ No newline at end of file diff --git a/mcp_fuzzer/config/extensions/__init__.py b/mcp_fuzzer/config/extensions/__init__.py new file mode 100644 index 0000000..34c0c30 --- /dev/null +++ b/mcp_fuzzer/config/extensions/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 +"""Configuration extensions (custom transports, etc.).""" + +from .transports import load_custom_transports + +__all__ = ["load_custom_transports"] + diff --git a/mcp_fuzzer/config/extensions/transports.py b/mcp_fuzzer/config/extensions/transports.py new file mode 100644 index 0000000..892af94 --- /dev/null +++ b/mcp_fuzzer/config/extensions/transports.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +"""Custom transport registration helpers.""" + +from __future__ import annotations + +import importlib +import logging +from typing import Any + +from ...exceptions import ConfigFileError, MCPError +from ...transport.custom import register_custom_transport +from ...transport.base import TransportProtocol + +logger = logging.getLogger(__name__) + + +def load_custom_transports(config_data: dict[str, Any]) -> None: + """Load and register custom transports from configuration. + + Args: + config_data: Configuration dictionary containing custom_transports section + """ + custom_transports = config_data.get("custom_transports", {}) + + for transport_name, transport_config in custom_transports.items(): + try: + module_path = transport_config["module"] + class_name = transport_config["class"] + + module = importlib.import_module(module_path) + transport_class = getattr(module, class_name) + try: + if not issubclass(transport_class, TransportProtocol): + raise ConfigFileError( + f"{module_path}.{class_name} must subclass TransportProtocol" + ) + except TypeError: + raise ConfigFileError(f"{module_path}.{class_name} is not a class") + + description = transport_config.get("description", "") + config_schema = transport_config.get("config_schema") + factory_fn = None + factory_path = transport_config.get("factory") + if factory_path: + try: + mod_path, attr = factory_path.rsplit(".", 1) + except ValueError as ve: + raise ConfigFileError( + f"Invalid factory path '{factory_path}'; expected 'module.attr'" + ) from ve + fmod = importlib.import_module(mod_path) + factory_fn = getattr(fmod, attr) + if not callable(factory_fn): + raise ConfigFileError(f"Factory '{factory_path}' is not callable") + + register_custom_transport( + name=transport_name, + transport_class=transport_class, + description=description, + config_schema=config_schema, + factory_function=factory_fn, + ) + + logger.info( + "Loaded custom transport '%s' from %s.%s", + transport_name, + module_path, + class_name, + ) + + except MCPError: + raise + except Exception as e: + logger.error( + "Failed to load custom transport '%s': %s", transport_name, e + ) + raise ConfigFileError( + f"Failed to load custom transport '{transport_name}': {e}" + ) from e diff --git a/mcp_fuzzer/config/loader.py b/mcp_fuzzer/config/loader.py deleted file mode 100644 index fbc21ee..0000000 --- a/mcp_fuzzer/config/loader.py +++ /dev/null @@ -1,384 +0,0 @@ -#!/usr/bin/env python3 -"""Configuration file loader for MCP Fuzzer. - -This module provides functionality to load configuration from YAML files. -""" - -import os -import logging -from pathlib import Path -from typing import Any - -import yaml - -from .manager import config -from ..exceptions import ConfigFileError, MCPError -from ..transport.custom import register_custom_transport -from ..transport.base import TransportProtocol -import importlib - -logger = logging.getLogger(__name__) - -def find_config_file( - config_path: str | None = None, - search_paths: list[str] | None = None, - file_names: list[str] | None = None, -) -> str | None: - """Find a configuration file in the given paths. - - Args: - config_path: Explicit path to config file, takes precedence if provided - search_paths: List of directories to search for config files - file_names: List of file names to search for - - Returns: - Path to the found config file or None if not found - """ - # If explicit path is provided, use it - if config_path and os.path.isfile(config_path): - return config_path - - # Default search paths - if search_paths is None: - search_paths = [ - os.getcwd(), # Current directory - str(Path.home() / ".config" / "mcp-fuzzer"), # User config directory - ] - - # Default file names - if file_names is None: - file_names = ["mcp-fuzzer.yml", "mcp-fuzzer.yaml"] - - # Search for config files - for path in search_paths: - if not os.path.isdir(path): - continue - - for name in file_names: - file_path = os.path.join(path, name) - if os.path.isfile(file_path): - return file_path - - return None - -def load_config_file(file_path: str) -> dict[str, Any]: - """Load configuration from a YAML file. - - Args: - file_path: Path to the configuration file - - Returns: - Dictionary containing the configuration - - Raises: - ConfigFileError: If the file cannot be found, parsed, or has permission issues - """ - if not os.path.isfile(file_path): - raise ConfigFileError(f"Configuration file not found: {file_path}") - - # Verify file extension - if not file_path.endswith((".yml", ".yaml")): - raise ConfigFileError( - f"Unsupported configuration file format: {file_path}. " - "Only YAML files with .yml or .yaml extensions are supported." - ) - - try: - with open(file_path, "r", encoding="utf-8") as f: - return yaml.safe_load(f) or {} - except yaml.YAMLError as e: - raise ConfigFileError( - f"Error parsing YAML configuration file {file_path}: {str(e)}" - ) - except PermissionError: - raise ConfigFileError( - f"Permission denied when reading configuration file: {file_path}" - ) - except Exception as e: - raise ConfigFileError( - f"Unexpected error reading configuration file {file_path}: {str(e)}" - ) - -def apply_config_file( - config_path: str | None = None, - search_paths: list[str] | None = None, - file_names: list[str] | None = None, -) -> bool: - """Find and apply configuration from a file. - - Args: - config_path: Explicit path to config file, takes precedence if provided - search_paths: List of directories to search for config files - file_names: List of file names to search for - - Returns: - True if configuration was loaded and applied, False otherwise - """ - # Find config file - file_path = find_config_file(config_path, search_paths, file_names) - if not file_path: - logger.debug("No configuration file found") - return False - - logger.info(f"Loading configuration from {file_path}") - try: - config_data = load_config_file(file_path) - load_custom_transports(config_data) - except (ConfigFileError, MCPError): - logger.exception("Failed to load configuration from %s", file_path) - return False - config.update(config_data) - return True - -def get_config_schema() -> dict[str, Any]: - """Get the configuration schema. - - Returns: - Dictionary describing the configuration schema - """ - return { - "type": "object", - "properties": { - "timeout": {"type": "number", "description": "Default timeout in seconds"}, - "tool_timeout": { - "type": "number", - "description": "Tool-specific timeout in seconds", - }, - "log_level": { - "type": "string", - "enum": ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], - }, - "safety_enabled": { - "type": "boolean", - "description": "Whether safety features are enabled", - }, - "fs_root": { - "type": "string", - "description": "Root directory for file operations", - }, - "http_timeout": { - "type": "number", - "description": "HTTP transport timeout in seconds", - }, - "sse_timeout": { - "type": "number", - "description": "SSE transport timeout in seconds", - }, - "stdio_timeout": { - "type": "number", - "description": "STDIO transport timeout in seconds", - }, - "mode": {"type": "string", "enum": ["tools", "tool", "protocol", "both"]}, - "phase": {"type": "string", "enum": ["realistic", "aggressive", "both"]}, - "protocol": { - "type": "string", - "enum": ["http", "https", "sse", "stdio", "streamablehttp"], - }, - "endpoint": {"type": "string", "description": "Server endpoint URL"}, - "runs": {"type": "integer", "description": "Number of fuzzing runs"}, - "runs_per_type": { - "type": "integer", - "description": "Number of runs per protocol type", - }, - "protocol_type": { - "type": "string", - "description": "Specific protocol type to fuzz", - }, - "no_network": {"type": "boolean", "description": "Disable network access"}, - "allow_hosts": { - "type": "array", - "items": {"type": "string"}, - "description": "List of allowed hosts", - }, - "max_concurrency": { - "type": "integer", - "description": "Maximum concurrent operations", - }, - "auth": { - "type": "object", - "properties": { - "providers": { - "type": "array", - "items": { - "type": "object", - "properties": { - "type": { - "type": "string", - "enum": ["api_key", "basic", "oauth", "custom"], - }, - "id": {"type": "string"}, - "config": {"type": "object"}, - }, - "required": ["type", "id"], - }, - }, - "mappings": { - "type": "object", - "additionalProperties": {"type": "string"}, - }, - }, - }, - "custom_transports": { - "type": "object", - "description": "Configuration for custom transport mechanisms", - "patternProperties": { - "^[a-zA-Z][a-zA-Z0-9_]*$": { - "type": "object", - "properties": { - "module": { - "type": "string", - "description": "Python module containing transport", - }, - "class": { - "type": "string", - "description": "Transport class name", - }, - "description": { - "type": "string", - "description": "Human-readable description", - }, - "factory": { - "type": "string", - "description": "Dotted path to factory function " - "(e.g., pkg.mod.create_transport)", - }, - "config_schema": { - "type": "object", - "description": "JSON schema for transport config", - }, - }, - "additionalProperties": False, - "required": ["module", "class"], - } - }, - "additionalProperties": False, - }, - "safety": { - "type": "object", - "properties": { - "enabled": {"type": "boolean"}, - "local_hosts": {"type": "array", "items": {"type": "string"}}, - "no_network": {"type": "boolean"}, - "header_denylist": {"type": "array", "items": {"type": "string"}}, - "proxy_env_denylist": { - "type": "array", - "items": {"type": "string"}, - }, - "env_allowlist": {"type": "array", "items": {"type": "string"}}, - }, - }, - "output": { - "type": "object", - "properties": { - "format": { - "type": "string", - "enum": ["json", "yaml", "csv", "xml"], - "description": "Output format for standardized reports", - }, - "directory": { - "type": "string", - "description": "Directory to save output files", - }, - "compress": { - "type": "boolean", - "description": "Whether to compress output files", - }, - "types": { - "type": "array", - "items": { - "type": "string", - "enum": [ - "fuzzing_results", - "error_report", - "safety_summary", - "performance_metrics", - "configuration_dump", - ], - }, - "description": "Specific output types to generate", - }, - "schema": { - "type": "string", - "description": "Path to custom output schema file", - }, - "retention": { - "type": "object", - "properties": { - "days": { - "type": "integer", - "description": "Number of days to retain output files", - }, - "max_size": { - "type": "string", - "description": ( - "Maximum size of output directory " - "(e.g., '1GB', '500MB')" - ), - }, - }, - }, - }, - }, - }, - } - -def load_custom_transports(config_data: dict[str, Any]) -> None: - """Load and register custom transports from configuration. - - Args: - config_data: Configuration dictionary containing custom_transports section - """ - custom_transports = config_data.get("custom_transports", {}) - - for transport_name, transport_config in custom_transports.items(): - try: - # Import the module - module_path = transport_config["module"] - class_name = transport_config["class"] - - module = importlib.import_module(module_path) - transport_class = getattr(module, class_name) - if not isinstance(transport_class, type): - raise ConfigFileError(f"{module_path}.{class_name} is not a class") - if not issubclass(transport_class, TransportProtocol): - raise ConfigFileError( - f"{module_path}.{class_name} must subclass TransportProtocol" - ) - - # Register the transport - description = transport_config.get("description", "") - config_schema = transport_config.get("config_schema") - factory_fn = None - factory_path = transport_config.get("factory") - if factory_path: - try: - mod_path, attr = factory_path.rsplit(".", 1) - except ValueError as ve: - raise ConfigFileError( - f"Invalid factory path '{factory_path}'; expected 'module.attr'" - ) from ve - fmod = importlib.import_module(mod_path) - factory_fn = getattr(fmod, attr) - if not callable(factory_fn): - raise ConfigFileError(f"Factory '{factory_path}' is not callable") - - register_custom_transport( - name=transport_name, - transport_class=transport_class, - description=description, - config_schema=config_schema, - factory_function=factory_fn, - ) - - logger.info( - f"Loaded custom transport '{transport_name}' from " - f"{module_path}.{class_name}" - ) - - except MCPError: - raise - except Exception as e: - logger.error(f"Failed to load custom transport '{transport_name}': {e}") - raise ConfigFileError( - f"Failed to load custom transport '{transport_name}': {e}" - ) from e diff --git a/mcp_fuzzer/config/loading/__init__.py b/mcp_fuzzer/config/loading/__init__.py new file mode 100644 index 0000000..69fc315 --- /dev/null +++ b/mcp_fuzzer/config/loading/__init__.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 +"""Configuration file loading and discovery.""" + +from .discovery import find_config_file, find_config_file_from_params +from .loader import ConfigLoader, apply_config_file +from .parser import load_config_file # noqa: F401 +from .search_params import ConfigSearchParams + +__all__ = [ + "ConfigLoader", + "find_config_file", + "find_config_file_from_params", + "load_config_file", + "apply_config_file", + "ConfigSearchParams", +] + diff --git a/mcp_fuzzer/config/loading/discovery.py b/mcp_fuzzer/config/loading/discovery.py new file mode 100644 index 0000000..58021ad --- /dev/null +++ b/mcp_fuzzer/config/loading/discovery.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +"""Helpers for locating configuration files.""" + +from __future__ import annotations + +import os +from pathlib import Path + +from .search_params import ConfigSearchParams + + +def find_config_file( + config_path: str | None = None, + search_paths: list[str] | None = None, + file_names: list[str] | None = None, +) -> str | None: + """Find a configuration file in the given paths. + + Args: + config_path: Explicit path to config file, takes precedence if provided + search_paths: List of directories to search for config files + file_names: List of file names to search for + + Returns: + Path to the found config file or None if not found + """ + params = ConfigSearchParams( + config_path=config_path, + search_paths=search_paths, + file_names=file_names, + ) + return _find_config_file_impl(params) + + +def find_config_file_from_params(params: ConfigSearchParams) -> str | None: + """Find a configuration file using ConfigSearchParams. + + Args: + params: Configuration search parameters + + Returns: + Path to the found config file or None if not found + """ + return _find_config_file_impl(params) + + +def _find_config_file_impl(params: ConfigSearchParams) -> str | None: + """Internal implementation of config file discovery. + + Args: + params: Configuration search parameters + + Returns: + Path to the found config file or None if not found + """ + if params.config_path and os.path.isfile(params.config_path): + return params.config_path + + search_paths = params.search_paths + if search_paths is None: + search_paths = [ + os.getcwd(), + str(Path.home() / ".config" / "mcp-fuzzer"), + ] + + file_names = params.file_names + if file_names is None: + file_names = ["mcp-fuzzer.yml", "mcp-fuzzer.yaml"] + + for path in search_paths: + if not os.path.isdir(path): + continue + for name in file_names: + file_path = os.path.join(path, name) + if os.path.isfile(file_path): + return file_path + + return None diff --git a/mcp_fuzzer/config/loading/loader.py b/mcp_fuzzer/config/loading/loader.py new file mode 100644 index 0000000..10cec89 --- /dev/null +++ b/mcp_fuzzer/config/loading/loader.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +"""Configuration loader helpers that glue the discovery/parser stack.""" + +from __future__ import annotations + +import logging +from typing import Any, Callable, Tuple + +from ..core.manager import Configuration, config +from ..extensions.transports import load_custom_transports +from ..schema.composer import get_config_schema # noqa: F401 (exported for consumers) +from .discovery import find_config_file +from .parser import load_config_file +from .search_params import ConfigSearchParams +from ...exceptions import ConfigFileError, MCPError + +logger = logging.getLogger(__name__) + +ConfigDict = dict[str, Any] +FileDiscoverer = Callable[ + [str | None, list[str] | None, list[str] | None], str | None +] +ConfigParser = Callable[[str], ConfigDict] +TransportLoader = Callable[[ConfigDict], None] + + +class ConfigLoader: + """Load configuration files with injectable discovery and parser implementations. + + This class reduces coupling by accepting a Configuration instance rather than + always using the global config object. + """ + + def __init__( + self, + discoverer: FileDiscoverer | None = None, + parser: ConfigParser | None = None, + transport_loader: TransportLoader | None = None, + config_instance: Configuration | None = None, + ): + self.discoverer = discoverer or find_config_file + self.parser = parser or load_config_file + self.transport_loader = transport_loader or load_custom_transports + self.config = config_instance or config + + def load( + self, + config_path: str | None = None, + search_paths: list[str] | None = None, + file_names: list[str] | None = None, + ) -> Tuple[ConfigDict | None, str | None]: + """Return the configuration dictionary and source file path. + + Args: + config_path: Explicit path to config file + search_paths: List of directories to search + file_names: List of file names to search for + + Returns: + Tuple of (config_dict, file_path) or (None, None) if not found + """ + file_path = self.discoverer(config_path, search_paths, file_names) + if not file_path: + logger.debug("No configuration file found") + return None, None + + logger.debug("Loading configuration from %s", file_path) + try: + config_data = self.parser(file_path) + self.transport_loader(config_data) + except (ConfigFileError, MCPError): + logger.exception("Failed to load configuration from %s", file_path) + raise + + return config_data, file_path + + def load_from_params( + self, params: ConfigSearchParams + ) -> Tuple[ + ConfigDict | None, str | None + ]: + """Load configuration using ConfigSearchParams. + + Args: + params: Configuration search parameters + + Returns: + Tuple of (config_dict, file_path) or (None, None) if not found + """ + return self.load( + config_path=params.config_path, + search_paths=params.search_paths, + file_names=params.file_names, + ) + + def apply( + self, + config_path: str | None = None, + search_paths: list[str] | None = None, + file_names: list[str] | None = None, + ) -> bool: + """Load configuration and merge it into the runtime state. + + Args: + config_path: Explicit path to config file + search_paths: List of directories to search + file_names: List of file names to search for + + Returns: + True if configuration was loaded and applied, False otherwise + """ + try: + config_data, file_path = self.load(config_path, search_paths, file_names) + except (ConfigFileError, MCPError) as e: + logger.debug("Failed to apply configuration: %s", e) + return False + + if not file_path: + return False + + self.config.update(config_data or {}) + return True + + def apply_from_params(self, params: ConfigSearchParams) -> bool: + """Apply configuration using ConfigSearchParams. + + Args: + params: Configuration search parameters + + Returns: + True if configuration was loaded and applied, False otherwise + """ + return self.apply( + config_path=params.config_path, + search_paths=params.search_paths, + file_names=params.file_names, + ) + + +def apply_config_file( + config_path: str | None = None, + search_paths: list[str] | None = None, + file_names: list[str] | None = None, +) -> bool: + """Convenience helper that uses the default loader to update global config. + + Args: + config_path: Explicit path to config file + search_paths: List of directories to search + file_names: List of file names to search for + + Returns: + True if configuration was loaded and applied, False otherwise + """ + loader = ConfigLoader() + return loader.apply( + config_path=config_path, + search_paths=search_paths, + file_names=file_names, + ) diff --git a/mcp_fuzzer/config/loading/parser.py b/mcp_fuzzer/config/loading/parser.py new file mode 100644 index 0000000..cb48ee0 --- /dev/null +++ b/mcp_fuzzer/config/loading/parser.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +"""YAML configuration parsing utilities.""" + +from __future__ import annotations + +import os +from typing import Any + +import yaml + +from ...exceptions import ConfigFileError + + +def load_config_file(file_path: str) -> dict[str, Any]: + """Load configuration from a YAML file. + + Args: + file_path: Path to the configuration file + + Returns: + Dictionary containing the configuration + + Raises: + ConfigFileError: If the file cannot be found, parsed, or has permission issues + """ + if not os.path.isfile(file_path): + raise ConfigFileError(f"Configuration file not found: {file_path}") + + if not file_path.endswith((".yml", ".yaml")): + raise ConfigFileError( + f"Unsupported configuration file format: {file_path}. " + "Only YAML files with .yml or .yaml extensions are supported." + ) + + try: + with open(file_path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + + # Validate that top-level config is a mapping/object + if not isinstance(data, dict): + raise ConfigFileError( + f"Top-level configuration in {file_path} must be a mapping/object, " + f"got {type(data).__name__}" + ) + + return data + except yaml.YAMLError as e: + raise ConfigFileError( + f"Error parsing YAML configuration file {file_path}: {e}" + ) from e + except PermissionError as e: + raise ConfigFileError( + f"Permission denied when reading configuration file: {file_path}" + ) from e + except ConfigFileError: + # Re-raise ConfigFileError as-is (already has proper context) + raise + except Exception as e: + raise ConfigFileError( + f"Unexpected error reading configuration file {file_path}: {e}" + ) from e diff --git a/mcp_fuzzer/config/loading/search_params.py b/mcp_fuzzer/config/loading/search_params.py new file mode 100644 index 0000000..f41d051 --- /dev/null +++ b/mcp_fuzzer/config/loading/search_params.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +"""Configuration search parameters dataclass to group related parameters.""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass +class ConfigSearchParams: + """Parameters for searching and loading configuration files. + + Groups related configuration search parameters to reduce parameter + list length and improve code clarity. + + Attributes: + config_path: Explicit path to config file, takes precedence if provided + search_paths: List of directories to search for config files + file_names: List of file names to search for + """ + + config_path: str | None = None + search_paths: list[str] | None = None + file_names: list[str] | None = None + diff --git a/mcp_fuzzer/config/manager.py b/mcp_fuzzer/config/manager.py deleted file mode 100644 index f08506f..0000000 --- a/mcp_fuzzer/config/manager.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python3 -"""Configuration management for MCP Fuzzer.""" - -import os -from typing import Any - -class Configuration: - """Centralized configuration management for MCP Fuzzer.""" - - def __init__(self): - self._config: dict[str, Any] = {} - self._load_from_env() - - def _load_from_env(self) -> None: - """Load configuration values from environment variables.""" - - def _get_float(key: str, default: float) -> float: - try: - return float(os.getenv(key, str(default))) - except (TypeError, ValueError): - return default - - def _get_bool(key: str, default: bool = False) -> bool: - val = os.getenv(key) - if val is None: - return default - return val.strip().lower() in {"1", "true", "yes", "on"} - - self._config["timeout"] = _get_float("MCP_FUZZER_TIMEOUT", 30.0) - self._config["log_level"] = os.getenv("MCP_FUZZER_LOG_LEVEL", "INFO") - self._config["safety_enabled"] = _get_bool("MCP_FUZZER_SAFETY_ENABLED", False) - self._config["fs_root"] = os.getenv( - "MCP_FUZZER_FS_ROOT", os.path.expanduser("~/.mcp_fuzzer") - ) - self._config["http_timeout"] = _get_float("MCP_FUZZER_HTTP_TIMEOUT", 30.0) - self._config["sse_timeout"] = _get_float("MCP_FUZZER_SSE_TIMEOUT", 30.0) - self._config["stdio_timeout"] = _get_float("MCP_FUZZER_STDIO_TIMEOUT", 30.0) - - def get(self, key: str, default: Any = None) -> Any: - """Get a configuration value by key.""" - return self._config.get(key, default) - - def set(self, key: str, value: Any) -> None: - """Set a configuration value.""" - self._config[key] = value - - def update(self, config_dict: dict[str, Any]) -> None: - """Update configuration with values from a dictionary.""" - self._config.update(config_dict) - -# Global configuration instance -config = Configuration() \ No newline at end of file diff --git a/mcp_fuzzer/config/schema/__init__.py b/mcp_fuzzer/config/schema/__init__.py new file mode 100644 index 0000000..9e033f8 --- /dev/null +++ b/mcp_fuzzer/config/schema/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 +"""Configuration schema validation.""" + +from .composer import get_config_schema + +__all__ = ["get_config_schema"] + diff --git a/mcp_fuzzer/config/schema/builders.py b/mcp_fuzzer/config/schema/builders.py new file mode 100644 index 0000000..ce780e7 --- /dev/null +++ b/mcp_fuzzer/config/schema/builders.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +"""Schema builder functions for configuration validation.""" + +from __future__ import annotations + +from typing import Any + + +def build_timeout_schema() -> dict[str, Any]: + """Build schema for timeout-related configuration.""" + return { + "timeout": {"type": "number", "description": "Default timeout in seconds"}, + "tool_timeout": { + "type": "number", + "description": "Tool-specific timeout in seconds", + }, + "http_timeout": { + "type": "number", + "description": "HTTP transport timeout in seconds", + }, + "sse_timeout": { + "type": "number", + "description": "SSE transport timeout in seconds", + }, + "stdio_timeout": { + "type": "number", + "description": "STDIO transport timeout in seconds", + }, + } + + +def build_basic_schema() -> dict[str, Any]: + """Build schema for basic configuration properties. + + Note: `safety_enabled` is a top-level convenience flag. If both + `safety_enabled` and `safety.enabled` are specified, `safety.enabled` + takes precedence. + """ + return { + "log_level": { + "type": "string", + "enum": ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + }, + "safety_enabled": { + "type": "boolean", + "description": ( + "Whether safety features are enabled (convenience flag). " + "If both safety_enabled and safety.enabled are specified, " + "safety.enabled takes precedence." + ), + }, + "fs_root": { + "type": "string", + "description": "Root directory for file operations", + }, + } + + +def build_fuzzing_schema() -> dict[str, Any]: + """Build schema for fuzzing-related configuration.""" + return { + "mode": {"type": "string", "enum": ["tools", "protocol", "both"]}, + "phase": {"type": "string", "enum": ["realistic", "aggressive", "both"]}, + "protocol": { + "type": "string", + "enum": ["http", "https", "sse", "stdio", "streamablehttp"], + }, + "endpoint": {"type": "string", "description": "Server endpoint URL"}, + "runs": {"type": "integer", "description": "Number of fuzzing runs"}, + "runs_per_type": { + "type": "integer", + "description": "Number of runs per protocol type", + }, + "protocol_type": { + "type": "string", + "description": "Specific protocol type to fuzz", + }, + "max_concurrency": { + "type": "integer", + "description": "Maximum concurrent operations", + }, + } + + +def build_network_schema() -> dict[str, Any]: + """Build schema for network-related configuration.""" + return { + "no_network": {"type": "boolean", "description": "Disable network access"}, + "allow_hosts": { + "type": "array", + "items": {"type": "string"}, + "description": "List of allowed hosts", + }, + } + + +def build_auth_schema() -> dict[str, Any]: + """Build schema for authentication configuration.""" + return { + "auth": { + "type": "object", + "properties": { + "providers": { + "type": "array", + "items": { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": ["api_key", "basic", "oauth", "custom"], + }, + "id": {"type": "string"}, + "config": {"type": "object"}, + }, + "required": ["type", "id"], + }, + }, + "mappings": { + "type": "object", + "additionalProperties": {"type": "string"}, + }, + }, + }, + } + + +def build_custom_transports_schema() -> dict[str, Any]: + """Build schema for custom transport configuration.""" + return { + "custom_transports": { + "type": "object", + "description": "Configuration for custom transport mechanisms", + "patternProperties": { + "^[a-zA-Z][a-zA-Z0-9_]*$": { + "type": "object", + "properties": { + "module": { + "type": "string", + "description": "Python module containing transport", + }, + "class": { + "type": "string", + "description": "Transport class name", + }, + "description": { + "type": "string", + "description": "Human-readable description", + }, + "factory": { + "type": "string", + "description": "Dotted path to factory function " + "(e.g., pkg.mod.create_transport)", + }, + "config_schema": { + "type": "object", + "description": "JSON schema for transport config", + }, + }, + "additionalProperties": False, + "required": ["module", "class"], + } + }, + "additionalProperties": False, + }, + } + + +def build_safety_schema() -> dict[str, Any]: + """Build schema for safety configuration. + + Note: `safety.enabled` takes precedence over top-level `safety_enabled` + if both are specified. + """ + return { + "safety": { + "type": "object", + "properties": { + "enabled": { + "type": "boolean", + "description": ( + "Whether safety features are enabled. " + "Takes precedence over top-level safety_enabled " + "if both are specified." + ), + }, + "local_hosts": {"type": "array", "items": {"type": "string"}}, + "no_network": {"type": "boolean"}, + "header_denylist": {"type": "array", "items": {"type": "string"}}, + "proxy_env_denylist": { + "type": "array", + "items": {"type": "string"}, + }, + "env_allowlist": {"type": "array", "items": {"type": "string"}}, + }, + "additionalProperties": False, + }, + } + + +def build_output_schema() -> dict[str, Any]: + """Build schema for output configuration.""" + return { + "output": { + "type": "object", + "properties": { + "format": { + "type": "string", + "enum": ["json", "yaml", "csv", "xml"], + "description": "Output format for standardized reports", + }, + "directory": { + "type": "string", + "description": "Directory to save output files", + }, + "compress": { + "type": "boolean", + "description": "Whether to compress output files", + }, + "types": { + "type": "array", + "items": { + "type": "string", + "enum": [ + "fuzzing_results", + "error_report", + "safety_summary", + "performance_metrics", + "configuration_dump", + ], + }, + "description": "Specific output types to generate", + }, + "schema": { + "type": "string", + "description": "Path to custom output schema file", + }, + "retention": { + "type": "object", + "properties": { + "days": { + "type": "integer", + "description": "Number of days to retain output files", + }, + "max_size": { + "type": "string", + "description": ( + "Maximum size of output directory " + "(e.g., '1GB', '500MB')" + ), + }, + }, + "additionalProperties": False, + }, + }, + "additionalProperties": False, + }, + } + diff --git a/mcp_fuzzer/config/schema/composer.py b/mcp_fuzzer/config/schema/composer.py new file mode 100644 index 0000000..2700dd3 --- /dev/null +++ b/mcp_fuzzer/config/schema/composer.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +"""Schema composer that combines all schema builders.""" + +from __future__ import annotations + +from typing import Any + +from .builders import ( + build_auth_schema, + build_basic_schema, + build_custom_transports_schema, + build_fuzzing_schema, + build_network_schema, + build_output_schema, + build_safety_schema, + build_timeout_schema, +) + + +def get_config_schema() -> dict[str, Any]: + """Return the JSON schema describing the configuration structure. + + The schema is built by composing smaller schema builders for logical + groupings of configuration properties. + + Returns: + Complete JSON schema dictionary for configuration validation + """ + properties = {} + properties.update(build_timeout_schema()) + properties.update(build_basic_schema()) + properties.update(build_fuzzing_schema()) + properties.update(build_network_schema()) + properties.update(build_auth_schema()) + properties.update(build_custom_transports_schema()) + properties.update(build_safety_schema()) + properties.update(build_output_schema()) + + return { + "type": "object", + "properties": properties, + } + diff --git a/mcp_fuzzer/fuzz_engine/runtime/watchdog.py b/mcp_fuzzer/fuzz_engine/runtime/watchdog.py index fc7129d..e23f018 100644 --- a/mcp_fuzzer/fuzz_engine/runtime/watchdog.py +++ b/mcp_fuzzer/fuzz_engine/runtime/watchdog.py @@ -19,7 +19,12 @@ import time from typing import Any, Awaitable, Callable, Protocol -from ...config.constants import PROCESS_FORCE_KILL_TIMEOUT, PROCESS_TERMINATION_TIMEOUT +# Import constants directly from config (constants are values, not behavior) +# Behavior (functions/classes) should go through client mediator +from ...config.core.constants import ( + PROCESS_FORCE_KILL_TIMEOUT, + PROCESS_TERMINATION_TIMEOUT, +) from ...exceptions import MCPError, ProcessStopError, WatchdogStartError from .config import WatchdogConfig from .registry import ProcessRecord, ProcessRegistry diff --git a/mcp_fuzzer/fuzz_engine/strategy/realistic/protocol_type_strategy.py b/mcp_fuzzer/fuzz_engine/strategy/realistic/protocol_type_strategy.py index 213db37..095704e 100644 --- a/mcp_fuzzer/fuzz_engine/strategy/realistic/protocol_type_strategy.py +++ b/mcp_fuzzer/fuzz_engine/strategy/realistic/protocol_type_strategy.py @@ -10,7 +10,8 @@ from typing import Any from hypothesis import strategies as st -from ....config import DEFAULT_PROTOCOL_VERSION +# Import constants directly from config (constants are values, not behavior) +from ....config.core.constants import DEFAULT_PROTOCOL_VERSION # Helper to keep URIs local-only SAFE_FILE_URIS = [ diff --git a/mcp_fuzzer/reports/reporter/__init__.py b/mcp_fuzzer/reports/reporter/__init__.py index ae0207f..11841c9 100644 --- a/mcp_fuzzer/reports/reporter/__init__.py +++ b/mcp_fuzzer/reports/reporter/__init__.py @@ -70,9 +70,11 @@ def __init__( """ # Dependency injection: use provided config or fall back to global if config_provider is None: - from ...config import config as default_config + from ...client.adapters import config_mediator - config_provider = default_config + # Use config_mediator as the provider + # (it implements dict-like interface via get()) + config_provider = config_mediator resolved_config = config or ReporterConfig.from_provider( provider=config_provider, diff --git a/mcp_fuzzer/safety_system/policy.py b/mcp_fuzzer/safety_system/policy.py index 618f1f1..e918dc8 100644 --- a/mcp_fuzzer/safety_system/policy.py +++ b/mcp_fuzzer/safety_system/policy.py @@ -12,7 +12,9 @@ import os from collections.abc import Iterable -from ..config import ( +# Import constants directly from config (constants are values, not behavior) +# Behavior (functions/classes) should go through client mediator +from ..config.core.constants import ( SAFETY_LOCAL_HOSTS, SAFETY_NO_NETWORK_DEFAULT, SAFETY_PROXY_ENV_DENYLIST, diff --git a/mcp_fuzzer/transport/http.py b/mcp_fuzzer/transport/http.py index 417def1..db91cb5 100644 --- a/mcp_fuzzer/transport/http.py +++ b/mcp_fuzzer/transport/http.py @@ -10,7 +10,8 @@ from .base import TransportProtocol from .mixins import NetworkTransportMixin, ResponseParsingMixin from ..fuzz_engine.runtime import ProcessManager, WatchdogConfig -from ..config import ( +# Import constants directly from config (constants are values, not behavior) +from ..config.core.constants import ( JSON_CONTENT_TYPE, DEFAULT_HTTP_ACCEPT, ) diff --git a/mcp_fuzzer/transport/stdio.py b/mcp_fuzzer/transport/stdio.py index c9aa0a2..80b8e17 100644 --- a/mcp_fuzzer/transport/stdio.py +++ b/mcp_fuzzer/transport/stdio.py @@ -20,7 +20,8 @@ ) from ..fuzz_engine.runtime import ProcessManager, WatchdogConfig from ..safety_system.policy import sanitize_subprocess_env -from ..config.constants import PROCESS_WAIT_TIMEOUT +# Import constants directly from config (constants are values, not behavior) +from ..config.core.constants import PROCESS_WAIT_TIMEOUT from .manager import TransportManager class StdioTransport(TransportProtocol): diff --git a/mcp_fuzzer/transport/streamable_http.py b/mcp_fuzzer/transport/streamable_http.py index 2e3be62..a0ba714 100644 --- a/mcp_fuzzer/transport/streamable_http.py +++ b/mcp_fuzzer/transport/streamable_http.py @@ -4,7 +4,8 @@ from typing import Any import httpx -from ..config import ( +# Import constants directly from config (constants are values, not behavior) +from ..config.core.constants import ( DEFAULT_PROTOCOL_VERSION, CONTENT_TYPE_HEADER, JSON_CONTENT_TYPE, diff --git a/tests/integration/test_custom_transport.py b/tests/integration/test_custom_transport.py index c1af8e7..58c748c 100644 --- a/tests/integration/test_custom_transport.py +++ b/tests/integration/test_custom_transport.py @@ -5,7 +5,8 @@ import os from pathlib import Path -from mcp_fuzzer.config import apply_config_file, load_custom_transports +from mcp_fuzzer.client.adapters import config_mediator +from mcp_fuzzer.config.extensions.transports import load_custom_transports from mcp_fuzzer.exceptions import ConfigFileError, TransportRegistrationError from mcp_fuzzer.transport import create_transport, register_custom_transport from mcp_fuzzer.transport.base import TransportProtocol @@ -97,7 +98,7 @@ def test_config_file_custom_transport_loading(self): try: # Load config and custom transports from the file we wrote - assert apply_config_file(config_path=config_path) is True + assert config_mediator.apply_file(config_path=config_path) is True # Test that transport was loaded from mcp_fuzzer.transport import list_custom_transports diff --git a/tests/integration/test_standardized_output.py b/tests/integration/test_standardized_output.py index 5881e61..898e172 100644 --- a/tests/integration/test_standardized_output.py +++ b/tests/integration/test_standardized_output.py @@ -123,10 +123,10 @@ def test_output_directory_structure(self): def test_configuration_driven_output(self): """Test that output generation respects configuration settings.""" - from mcp_fuzzer.config import config + from mcp_fuzzer.client.adapters import config_mediator # Set output configuration - config.update({ + config_mediator.update({ "output": { "format": "json", "directory": self.temp_dir, diff --git a/tests/unit/cli/test_cli.py b/tests/unit/cli/test_cli.py index d255a14..a0ce7e3 100644 --- a/tests/unit/cli/test_cli.py +++ b/tests/unit/cli/test_cli.py @@ -268,7 +268,7 @@ def test_build_cli_config_merges_and_returns_cli_config(): def test_handle_validate_config(monkeypatch): validator = ValidationManager() with patch( - "mcp_fuzzer.cli.validators.load_config_file" + "mcp_fuzzer.cli.validators.config_mediator.load_file" ) as mock_load: validator.validate_config_file("config.yml") mock_load.assert_called_once_with("config.yml") @@ -545,7 +545,7 @@ def test_run_cli_unexpected_error_debug(monkeypatch, caplog): def test_build_cli_config_uses_config_file(monkeypatch): args = _base_args(config="custom.yml", endpoint=None) with patch( - "mcp_fuzzer.cli.config_merge.load_config_file", + "mcp_fuzzer.cli.config_merge.config_mediator.load_file", return_value={ "endpoint": "http://conf", "runs": 42, @@ -560,18 +560,20 @@ def test_build_cli_config_uses_config_file(monkeypatch): def test_build_cli_config_handles_apply_config_error(caplog): caplog.set_level(logging.DEBUG) args = _base_args(config=None) + # apply_file() now returns False instead of raising exceptions with patch( - "mcp_fuzzer.cli.config_merge.apply_config_file", - side_effect=Exception("fail"), + "mcp_fuzzer.cli.config_merge.config_mediator.apply_file", + return_value=False, ): cli_config = build_cli_config(args) assert cli_config.merged["endpoint"] == "http://localhost" - assert "fail" in "".join(caplog.messages) + # Check that debug message was logged when config file is not found + assert "Default configuration file not found" in "".join(caplog.messages) def test_build_cli_config_raises_config_error(): args = _base_args(config="bad.yml") with patch( - "mcp_fuzzer.cli.config_merge.load_config_file", + "mcp_fuzzer.cli.config_merge.config_mediator.load_file", side_effect=ValueError("bad"), ): with pytest.raises(Exception): diff --git a/tests/unit/client/test_config_adapter.py b/tests/unit/client/test_config_adapter.py new file mode 100644 index 0000000..b73cd1f --- /dev/null +++ b/tests/unit/client/test_config_adapter.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +"""Unit tests for config adapter and port.""" + +from __future__ import annotations + +from pathlib import Path +from textwrap import dedent +from unittest.mock import Mock, patch + +import pytest + +from mcp_fuzzer.client.adapters import ConfigAdapter, config_mediator +from mcp_fuzzer.client.ports import ConfigPort +from mcp_fuzzer.exceptions import ConfigFileError + + +def test_config_adapter_implements_port(): + """Test that ConfigAdapter implements ConfigPort interface.""" + adapter = ConfigAdapter() + assert isinstance(adapter, ConfigPort) + + +def test_config_adapter_get_set(): + """Test ConfigAdapter get and set methods.""" + adapter = ConfigAdapter() + adapter.set("test_key", "test_value") + assert adapter.get("test_key") == "test_value" + assert adapter.get("nonexistent", "default") == "default" + + +def test_config_adapter_update(): + """Test ConfigAdapter update method.""" + adapter = ConfigAdapter() + adapter.update({"key1": "value1", "key2": "value2"}) + assert adapter.get("key1") == "value1" + assert adapter.get("key2") == "value2" + + +def test_config_adapter_load_file(tmp_path): + """Test ConfigAdapter load_file method.""" + config_file = tmp_path / "test.yaml" + config_file.write_text("timeout: 30.0\nlog_level: DEBUG") + + adapter = ConfigAdapter() + config_data = adapter.load_file(str(config_file)) + assert config_data["timeout"] == 30.0 + assert config_data["log_level"] == "DEBUG" + + +def test_config_adapter_load_file_not_found(): + """Test ConfigAdapter load_file raises error for missing file.""" + adapter = ConfigAdapter() + with pytest.raises(ConfigFileError): + adapter.load_file("/nonexistent/path.yaml") + + +def test_config_adapter_apply_file(tmp_path): + """Test ConfigAdapter apply_file method.""" + config_file = tmp_path / "test.yaml" + config_file.write_text("timeout: 45.0\nlog_level: INFO") + + adapter = ConfigAdapter() + result = adapter.apply_file(config_path=str(config_file)) + assert result is True + assert adapter.get("timeout") == 45.0 + assert adapter.get("log_level") == "INFO" + + +def test_config_adapter_apply_file_not_found(): + """Test ConfigAdapter apply_file returns False for missing file.""" + adapter = ConfigAdapter() + result = adapter.apply_file(config_path="/nonexistent/path.yaml") + assert result is False + + +def test_config_adapter_get_schema(): + """Test ConfigAdapter get_schema method.""" + adapter = ConfigAdapter() + schema = adapter.get_schema() + assert isinstance(schema, dict) + assert schema["type"] == "object" + assert "properties" in schema + + +def test_config_adapter_custom_instance(): + """Test ConfigAdapter with custom config instance.""" + mock_config = Mock() + mock_config.get = Mock(return_value="custom_value") + mock_config.set = Mock() + mock_config.update = Mock() + + adapter = ConfigAdapter(config_instance=mock_config) + assert adapter.get("test_key") == "custom_value" + mock_config.get.assert_called_once_with("test_key", None) + + +def test_config_mediator_is_global_instance(): + """Test that config_mediator is a global ConfigAdapter instance.""" + assert isinstance(config_mediator, ConfigAdapter) + assert isinstance(config_mediator, ConfigPort) + + +def test_config_mediator_functionality(tmp_path): + """Test that global config_mediator works correctly.""" + config_file = tmp_path / "test.yaml" + config_file.write_text("timeout: 60.0") + + # Test apply_file + result = config_mediator.apply_file(config_path=str(config_file)) + assert result is True + + # Test get + assert config_mediator.get("timeout") == 60.0 + + # Test set + config_mediator.set("custom_key", "custom_value") + assert config_mediator.get("custom_key") == "custom_value" + + # Test update + config_mediator.update({"key1": "val1", "key2": "val2"}) + assert config_mediator.get("key1") == "val1" + assert config_mediator.get("key2") == "val2" + + +def test_config_mediator_as_mapping(): + """Test that config_mediator can be used as a Mapping-like object.""" + config_mediator.set("test_key", "test_value") + # Should work with .get() method (Mapping interface) + assert config_mediator.get("test_key") == "test_value" + assert config_mediator.get("nonexistent", "default") == "default" + diff --git a/tests/unit/config/test_config_loader.py b/tests/unit/config/test_config_loader.py index 50f08df..fb7e8c6 100644 --- a/tests/unit/config/test_config_loader.py +++ b/tests/unit/config/test_config_loader.py @@ -1,203 +1,260 @@ #!/usr/bin/env python3 -"""Tests for the configuration loader module.""" +"""Unit tests that exercise the new config loader helpers.""" + +from __future__ import annotations import os -import tempfile +from pathlib import Path +from textwrap import dedent +from typing import Any + import pytest -from unittest.mock import patch, mock_open +from unittest.mock import Mock, patch from mcp_fuzzer.config import ( - find_config_file, - load_config_file, + ConfigLoader, + ConfigSearchParams, apply_config_file, + find_config_file, get_config_schema, - config, + load_config_file, ) -from mcp_fuzzer.exceptions import ConfigFileError, ValidationError +from mcp_fuzzer.config.core.manager import Configuration +from mcp_fuzzer.exceptions import ConfigFileError @pytest.fixture -def config_files(): - """Create temporary YAML files for testing.""" - # Create temporary directory - temp_dir = tempfile.TemporaryDirectory() - - # YAML content - yaml_content = """ -timeout: 60.0 -log_level: "DEBUG" -safety: - enabled: true - no_network: false - local_hosts: - - "localhost" - - "127.0.0.1" -""" - # Create .yml file - yml_path = os.path.join(temp_dir.name, "mcp-fuzzer.yml") - with open(yml_path, "w") as f: - f.write(yaml_content) - - # Create .yaml file - yaml_path = os.path.join(temp_dir.name, "mcp-fuzzer.yaml") - with open(yaml_path, "w") as f: - f.write(yaml_content) - - # Return paths - yield { - "temp_dir": temp_dir, - "yml_path": yml_path, - "yaml_path": yaml_path, +def config_files(tmp_path: Path) -> dict[str, Any]: + """Create a couple of temporary YAML config files for reuse.""" + content = dedent( + """ + timeout: 60.0 + log_level: "DEBUG" + safety: + enabled: true + no_network: false + local_hosts: + - "localhost" + """ + ).strip() + + yml_path = tmp_path / "mcp-fuzzer.yml" + yaml_path = tmp_path / "mcp-fuzzer.yaml" + yml_path.write_text(content) + yaml_path.write_text(content) + + return { + "temp_dir": tmp_path, + "yml_path": str(yml_path), + "yaml_path": str(yaml_path), } - # Cleanup - temp_dir.cleanup() - def test_find_config_file_explicit_path(config_files): - """Test finding a config file with an explicit path.""" - # Test with explicit path - found_path = find_config_file(config_path=config_files["yaml_path"]) - assert found_path == config_files["yaml_path"] - - # Test with non-existent path - found_path = find_config_file(config_path="/non/existent/path") - assert found_path is None + result = find_config_file(config_path=config_files["yaml_path"]) + assert result == config_files["yaml_path"] def test_find_config_file_search_paths(config_files): - """Test finding a config file in search paths.""" - # Test with search paths - found_path = find_config_file(search_paths=[config_files["temp_dir"].name]) - assert found_path in [config_files["yml_path"], config_files["yaml_path"]] + result = find_config_file(search_paths=[str(config_files["temp_dir"])]) + assert result in [config_files["yml_path"], config_files["yaml_path"]] - # Test with empty search paths - found_path = find_config_file(search_paths=["/non/existent/path"]) - assert found_path is None +def test_find_config_file_default_search(monkeypatch, tmp_path): + """Ensure the default search uses the current working directory.""" + config_path = tmp_path / "mcp-fuzzer.yml" + config_path.write_text("timeout: 5") + + monkeypatch.chdir(tmp_path) + assert find_config_file() == str(config_path) -def test_load_config_file_yml(config_files): - """Test loading a .yml config file.""" - config_data = load_config_file(config_files["yml_path"]) - assert config_data["timeout"] == 60.0 - assert config_data["log_level"] == "DEBUG" - assert config_data["safety"]["enabled"] is True - assert config_data["safety"]["no_network"] is False - assert config_data["safety"]["local_hosts"] == ["localhost", "127.0.0.1"] + +def test_find_config_file_missing(tmp_path): + result = find_config_file( + search_paths=[str(tmp_path)], + file_names=["nonexistent.yml"], + ) + assert result is None def test_load_config_file_yaml(config_files): - """Test loading a .yaml config file.""" - config_data = load_config_file(config_files["yaml_path"]) - assert config_data["timeout"] == 60.0 + config_data = load_config_file(config_files["yml_path"]) assert config_data["log_level"] == "DEBUG" + assert config_data["timeout"] == 60.0 assert config_data["safety"]["enabled"] is True - assert config_data["safety"]["no_network"] is False - assert config_data["safety"]["local_hosts"] == ["localhost", "127.0.0.1"] -def test_load_config_file_invalid_format(config_files): - """Test loading a config file with an invalid format.""" - # Create a file with an invalid extension - invalid_path = os.path.join(config_files["temp_dir"].name, "invalid.txt") - with open(invalid_path, "w") as f: - f.write("invalid content") - +def test_load_config_file_invalid_extension(tmp_path): + invalid = tmp_path / "config.txt" + invalid.write_text("timeout: 1") with pytest.raises(ConfigFileError): - load_config_file(invalid_path) + load_config_file(str(invalid)) def test_load_config_file_not_found(): - """Test loading a non-existent config file.""" - with pytest.raises(ConfigFileError): - load_config_file("/non/existent/path") - - -def test_load_config_file_invalid_yaml(config_files): - """Test loading an invalid YAML file.""" - invalid_yaml_path = os.path.join(config_files["temp_dir"].name, "invalid.yaml") - with open(invalid_yaml_path, "w") as f: - f.write("invalid: yaml: content:") - with pytest.raises(ConfigFileError): - load_config_file(invalid_yaml_path) - + load_config_file("/non/existent/path.yaml") -def test_load_config_file_invalid_extension(config_files): - """Test loading a file with invalid extension.""" - invalid_ext_path = os.path.join(config_files["temp_dir"].name, "config.txt") - with open(invalid_ext_path, "w") as f: - f.write("valid: yaml") +def test_load_config_file_invalid_yaml(tmp_path): + invalid = tmp_path / "broken.yaml" + invalid.write_text("timeout: [1,") with pytest.raises(ConfigFileError): - load_config_file(invalid_ext_path) + load_config_file(str(invalid)) -@patch("mcp_fuzzer.config.loader.config") -def test_apply_config_file(mock_config, config_files): - """Test applying a config file.""" - # Test with explicit path +@patch("mcp_fuzzer.config.loading.loader.load_custom_transports") +@patch("mcp_fuzzer.config.loading.loader.config") +def test_apply_config_file_updates_global_state( + mock_config, mock_transports, config_files +): result = apply_config_file(config_path=config_files["yaml_path"]) assert result is True - mock_config.update.assert_called_once() - - # Reset mock - mock_config.reset_mock() - - # Test with search paths - result = apply_config_file(search_paths=[config_files["temp_dir"].name]) - assert result is True - mock_config.update.assert_called_once() + mock_transports.assert_called_once() + updated = mock_config.update.call_args[0][0] + assert updated.get("timeout") == 60.0 + assert mock_config.update.call_count == 1 - # Reset mock - mock_config.reset_mock() - # Test with non-existent path - result = apply_config_file(config_path="/non/existent/path") +@patch("mcp_fuzzer.config.loading.loader.load_custom_transports") +@patch("mcp_fuzzer.config.loading.loader.config") +def test_apply_config_file_returns_false_when_missing(mock_config, mock_transports): + result = apply_config_file(config_path="/nope.yaml") assert result is False mock_config.update.assert_not_called() + mock_transports.assert_not_called() -def test_apply_config_file_handles_load_errors(config_files): - """apply_config_file should return False if load_config_file fails.""" - with patch( - "mcp_fuzzer.config.loader.config" - ) as mock_config, patch( - "mcp_fuzzer.config.loader.load_config_file" - ) as mock_load_config, patch( - "mcp_fuzzer.config.loader.load_custom_transports" - ) as mock_load_custom: - mock_load_config.side_effect = ConfigFileError("boom") - result = apply_config_file(config_path=config_files["yaml_path"]) - - assert result is False +def test_get_config_schema_includes_expected_fields(): + schema = get_config_schema() + props = schema["properties"] + assert props["timeout"]["type"] == "number" + assert "custom_transports" in props + assert props["custom_transports"]["patternProperties"] + + +def test_config_loader_load_invokes_dependencies(): + parser = Mock(return_value={"timeout": 1}) + transport_loader = Mock() + loader = ConfigLoader( + discoverer=lambda *_args, **_kwargs: "config.yaml", + parser=parser, + transport_loader=transport_loader, + ) + data, path = loader.load() + parser.assert_called_once_with("config.yaml") + transport_loader.assert_called_once_with({"timeout": 1}) + assert data == {"timeout": 1} + assert path == "config.yaml" + + +def test_config_loader_load_returns_none_when_not_found(): + loader = ConfigLoader(discoverer=lambda *_: None) + data, path = loader.load() + assert data is None + assert path is None + + +def test_config_loader_apply_merges_data(): + parser = Mock(return_value={"log_level": "INFO"}) + mock_config = Mock(spec=Configuration) + loader = ConfigLoader( + discoverer=lambda *_: "config.yaml", + parser=parser, + transport_loader=Mock(), + config_instance=mock_config, + ) + assert loader.apply() is True + mock_config.update.assert_called_once_with({"log_level": "INFO"}) + + +def test_config_loader_apply_handles_parser_errors(): + parser = Mock(side_effect=ConfigFileError("boom")) + mock_config = Mock(spec=Configuration) + loader = ConfigLoader( + discoverer=lambda *_: "config.yaml", + parser=parser, + transport_loader=Mock(), + config_instance=mock_config, + ) + assert loader.apply() is False mock_config.update.assert_not_called() - mock_load_custom.assert_not_called() - - -def test_apply_config_file_handles_custom_transport_errors(config_files): - """apply_config_file should return False if load_custom_transports fails.""" - with patch( - "mcp_fuzzer.config.loader.config" - ) as mock_config, patch( - "mcp_fuzzer.config.loader.load_config_file" - ) as mock_load_config, patch( - "mcp_fuzzer.config.loader.load_custom_transports" - ) as mock_load_custom: - mock_load_config.return_value = {"custom_transports": {}} - mock_load_custom.side_effect = ConfigFileError("bad transport") - result = apply_config_file(config_path=config_files["yaml_path"]) - assert result is False + +def test_config_loader_apply_handles_transport_errors(): + parser = Mock(return_value={"timeout": 5}) + transport_loader = Mock(side_effect=ConfigFileError("bad transport")) + mock_config = Mock(spec=Configuration) + loader = ConfigLoader( + discoverer=lambda *_: "config.yaml", + parser=parser, + transport_loader=transport_loader, + config_instance=mock_config, + ) + assert loader.apply() is False mock_config.update.assert_not_called() -def test_get_config_schema(): - """Test getting the configuration schema.""" - schema = get_config_schema() - assert isinstance(schema, dict) - assert schema["type"] == "object" - assert "properties" in schema - assert "timeout" in schema["properties"] - assert "log_level" in schema["properties"] - assert "safety" in schema["properties"] +def test_config_loader_load_logs_at_debug_level(config_files): + """Test that loading configuration logs at DEBUG level.""" + import logging + + with patch("mcp_fuzzer.config.loading.loader.logger") as mock_logger: + loader = ConfigLoader() + loader.load(config_path=config_files["yaml_path"]) + # Should log at DEBUG level, not INFO + mock_logger.debug.assert_called() + mock_logger.info.assert_not_called() + + +def test_config_loader_apply_logs_failures(): + """Test that apply() logs failures at DEBUG level.""" + parser = Mock(side_effect=ConfigFileError("test error")) + mock_config = Mock(spec=Configuration) + loader = ConfigLoader( + discoverer=lambda *_: "config.yaml", + parser=parser, + transport_loader=Mock(), + config_instance=mock_config, + ) + with patch("mcp_fuzzer.config.loading.loader.logger") as mock_logger: + result = loader.apply() + assert result is False + # Should log the failure at DEBUG level + mock_logger.debug.assert_called() + # Check that the log message contains the expected text + call_args_str = str(mock_logger.debug.call_args) + assert ( + "Failed to apply configuration" in call_args_str + or "test error" in call_args_str + ) + + +def test_config_loader_load_from_params(): + """Test ConfigLoader.load_from_params() method.""" + parser = Mock(return_value={"timeout": 30}) + params = ConfigSearchParams(config_path="test.yaml") + loader = ConfigLoader( + discoverer=lambda *args, **kwargs: "test.yaml", + parser=parser, + transport_loader=Mock(), + ) + data, path = loader.load_from_params(params) + assert data == {"timeout": 30} + assert path == "test.yaml" + + +def test_config_loader_apply_from_params(): + """Test ConfigLoader.apply_from_params() method.""" + parser = Mock(return_value={"log_level": "DEBUG"}) + mock_config = Mock(spec=Configuration) + params = ConfigSearchParams(config_path="test.yaml") + loader = ConfigLoader( + discoverer=lambda *args, **kwargs: "test.yaml", + parser=parser, + transport_loader=Mock(), + config_instance=mock_config, + ) + result = loader.apply_from_params(params) + assert result is True + mock_config.update.assert_called_once_with({"log_level": "DEBUG"}) diff --git a/tests/unit/config/test_constants.py b/tests/unit/config/test_constants.py new file mode 100644 index 0000000..3bbe276 --- /dev/null +++ b/tests/unit/config/test_constants.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +"""Unit tests for configuration constants.""" + +from __future__ import annotations + +import pytest + +from mcp_fuzzer.config import ( + DEFAULT_FORCE_KILL_TIMEOUT, + DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, + DEFAULT_MAX_TOTAL_FUZZING_TIME, + DEFAULT_MAX_TOOL_TIME, + DEFAULT_PROTOCOL_RUNS_PER_TYPE, + DEFAULT_PROTOCOL_VERSION, + DEFAULT_TIMEOUT, + DEFAULT_TOOL_RUNS, + DEFAULT_TOOL_TIMEOUT, + PROCESS_CLEANUP_TIMEOUT, + PROCESS_FORCE_KILL_TIMEOUT, + PROCESS_TERMINATION_TIMEOUT, + PROCESS_WAIT_TIMEOUT, + SAFETY_ENV_ALLOWLIST, + SAFETY_HEADER_DENYLIST, + SAFETY_LOCAL_HOSTS, + SAFETY_NO_NETWORK_DEFAULT, + SAFETY_PROXY_ENV_DENYLIST, +) + + +def test_process_constants_exported(): + """Test that PROCESS_* constants are properly exported.""" + assert PROCESS_TERMINATION_TIMEOUT == 0.5 + assert PROCESS_FORCE_KILL_TIMEOUT == 1.0 + assert PROCESS_CLEANUP_TIMEOUT == 5.0 + assert PROCESS_WAIT_TIMEOUT == 1.0 + assert isinstance(PROCESS_TERMINATION_TIMEOUT, float) + assert isinstance(PROCESS_FORCE_KILL_TIMEOUT, float) + assert isinstance(PROCESS_CLEANUP_TIMEOUT, float) + assert isinstance(PROCESS_WAIT_TIMEOUT, float) + + +def test_safety_no_network_default(): + """Test that SAFETY_NO_NETWORK_DEFAULT is properly documented.""" + # The constant should be False (allow network by default) + # with a clear comment explaining it + assert SAFETY_NO_NETWORK_DEFAULT is False + assert isinstance(SAFETY_NO_NETWORK_DEFAULT, bool) + + +def test_safety_local_hosts(): + """Test that SAFETY_LOCAL_HOSTS contains expected local hosts.""" + assert "localhost" in SAFETY_LOCAL_HOSTS + assert "127.0.0.1" in SAFETY_LOCAL_HOSTS + assert "::1" in SAFETY_LOCAL_HOSTS + assert isinstance(SAFETY_LOCAL_HOSTS, set) + + +def test_safety_header_denylist(): + """Test that SAFETY_HEADER_DENYLIST contains sensitive headers.""" + assert "authorization" in SAFETY_HEADER_DENYLIST + assert "cookie" in SAFETY_HEADER_DENYLIST + assert isinstance(SAFETY_HEADER_DENYLIST, set) + + +def test_safety_proxy_env_denylist(): + """Test that SAFETY_PROXY_ENV_DENYLIST contains proxy env vars.""" + expected_vars = { + "HTTP_PROXY", + "HTTPS_PROXY", + "ALL_PROXY", + "NO_PROXY", + "http_proxy", + "https_proxy", + "all_proxy", + "no_proxy", + } + assert expected_vars.issubset(SAFETY_PROXY_ENV_DENYLIST) + assert isinstance(SAFETY_PROXY_ENV_DENYLIST, set) + + +def test_safety_env_allowlist(): + """Test that SAFETY_ENV_ALLOWLIST is a set.""" + assert isinstance(SAFETY_ENV_ALLOWLIST, set) + + +def test_default_timeout_values(): + """Test that default timeout values are positive floats.""" + assert DEFAULT_TIMEOUT > 0 + assert DEFAULT_TOOL_TIMEOUT > 0 + assert DEFAULT_MAX_TOOL_TIME > 0 + assert DEFAULT_MAX_TOTAL_FUZZING_TIME > 0 + assert DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT > 0 + assert DEFAULT_FORCE_KILL_TIMEOUT > 0 + assert all(isinstance(v, float) for v in [ + DEFAULT_TIMEOUT, + DEFAULT_TOOL_TIMEOUT, + DEFAULT_MAX_TOOL_TIME, + DEFAULT_MAX_TOTAL_FUZZING_TIME, + DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT, + DEFAULT_FORCE_KILL_TIMEOUT, + ]) + + +def test_default_run_counts(): + """Test that default run counts are positive integers.""" + assert DEFAULT_TOOL_RUNS > 0 + assert DEFAULT_PROTOCOL_RUNS_PER_TYPE > 0 + assert isinstance(DEFAULT_TOOL_RUNS, int) + assert isinstance(DEFAULT_PROTOCOL_RUNS_PER_TYPE, int) + + +def test_default_protocol_version(): + """Test that default protocol version is a string.""" + assert isinstance(DEFAULT_PROTOCOL_VERSION, str) + assert len(DEFAULT_PROTOCOL_VERSION) > 0 + diff --git a/tests/unit/config/test_discovery_transports.py b/tests/unit/config/test_discovery_transports.py new file mode 100644 index 0000000..4720cc5 --- /dev/null +++ b/tests/unit/config/test_discovery_transports.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 +"""Targeted tests for the config discovery and transport helpers.""" + +from __future__ import annotations + +import os +import tempfile + +import pytest + +from mcp_fuzzer.config.loading.discovery import ( + find_config_file, + find_config_file_from_params, +) +from mcp_fuzzer.config.loading.search_params import ConfigSearchParams +from mcp_fuzzer.config.extensions.transports import load_custom_transports +from mcp_fuzzer.exceptions import ConfigFileError +from mcp_fuzzer.transport.base import TransportProtocol +from mcp_fuzzer.transport.custom import list_custom_transports, registry + + +class DummyTransport(TransportProtocol): + """Simple transport stub used for registration tests.""" + + async def send_request(self, method: str, params=None): + return {"jsonrpc": "2.0", "result": {}, "id": 1} + + async def send_raw(self, payload): + return {"result": "ok"} + + async def send_notification(self, method: str, params=None): + return None + + async def _stream_request(self, payload): + yield {"jsonrpc": "2.0", "result": payload} + + +class NonTransport: + """Helper class that does not inherit TransportProtocol.""" + + +@pytest.fixture(autouse=True) +def clear_registry(): + """Always clear custom transport registry before and after each test.""" + registry.clear() + yield + registry.clear() + + +def test_find_config_file_prefers_explicit_path(tmp_path): + """Explicit config_path should be returned even if other files exist.""" + path = tmp_path / "mcp-fuzzer.yaml" + path.write_text("timeout: 5") + result = find_config_file( + config_path=str(path), search_paths=[str(tmp_path)] + ) + assert result == str(path) + + +def test_find_config_file_search_paths(tmp_path): + """Search paths should be honored when they contain a config file.""" + path = tmp_path / "mcp-fuzzer.yml" + path.write_text("timeout: 10\n") + result = find_config_file( + search_paths=[str(tmp_path)], file_names=["mcp-fuzzer.yml"] + ) + assert result == str(path) + + +def test_find_config_file_returns_none_when_missing(tmp_path): + """Return None when no configuration file exists in the requested paths.""" + assert find_config_file(search_paths=[str(tmp_path)]) is None + + +def test_load_custom_transports_registers_transport(): + """Valid transport entry should be registered in the custom registry.""" + config_data = { + "custom_transports": { + "dummy": { + "module": __name__, + "class": "DummyTransport", + "description": "Unit test transport", + } + } + } + + load_custom_transports(config_data) + transports = list_custom_transports() + assert "dummy" in transports + + +def test_load_custom_transports_missing_module_raises(): + """Non-existent module should raise ConfigFileError.""" + with pytest.raises(ConfigFileError): + load_custom_transports( + { + "custom_transports": { + "missing": {"module": "no.such.module", "class": "FooTransport"} + } + } + ) + + +def test_load_custom_transports_invalid_class_raises(): + """Classes that do not inherit TransportProtocol should fail validation.""" + with pytest.raises(ConfigFileError): + load_custom_transports( + { + "custom_transports": { + "invalid": {"module": __name__, "class": "NonTransport"} + } + } + ) + + +def test_find_config_file_from_params(tmp_path): + """Test find_config_file_from_params with ConfigSearchParams.""" + path = tmp_path / "mcp-fuzzer.yaml" + path.write_text("timeout: 5") + params = ConfigSearchParams( + config_path=str(path), + search_paths=[str(tmp_path)], + file_names=["mcp-fuzzer.yaml"], + ) + result = find_config_file_from_params(params) + assert result == str(path) + + +def test_find_config_file_from_params_search(tmp_path): + """Test find_config_file_from_params with search paths.""" + path = tmp_path / "mcp-fuzzer.yml" + path.write_text("timeout: 10") + params = ConfigSearchParams( + config_path=None, + search_paths=[str(tmp_path)], + file_names=["mcp-fuzzer.yml"], + ) + result = find_config_file_from_params(params) + assert result == str(path) + + +def test_load_custom_transports_non_class_raises(): + """Non-class attributes should raise ConfigFileError with TypeError handling.""" + config_data = { + "custom_transports": { + "invalid": { + "module": __name__, + # This is a function, not a class + "class": "test_load_custom_transports_non_class_raises", + } + } + } + with pytest.raises(ConfigFileError, match="is not a class"): + load_custom_transports(config_data) + + +def test_load_custom_transports_logs_with_percent_formatting(monkeypatch): + """Test that transport loading uses proper logging format (not f-strings).""" + from unittest.mock import Mock + + mock_logger = Mock() + monkeypatch.setattr("mcp_fuzzer.config.extensions.transports.logger", mock_logger) + + config_data = { + "custom_transports": { + "dummy": { + "module": __name__, + "class": "DummyTransport", + "description": "Test transport", + } + } + } + + load_custom_transports(config_data) + + # Verify logger.info was called with separate arguments (not f-string) + mock_logger.info.assert_called_once() + call_args = mock_logger.info.call_args + # Should be called with format string and separate args (not f-string) + assert len(call_args[0]) == 4 # format string + 3 args + assert call_args[0][0] == "Loaded custom transport '%s' from %s.%s" + assert call_args[0][1] == "dummy" + assert call_args[0][2] == __name__ + assert call_args[0][3] == "DummyTransport" diff --git a/tests/unit/config/test_manager.py b/tests/unit/config/test_manager.py new file mode 100644 index 0000000..0027507 --- /dev/null +++ b/tests/unit/config/test_manager.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +"""Unit tests for configuration manager.""" + +from __future__ import annotations + +import os +from unittest.mock import patch + +import pytest + +from mcp_fuzzer.config.core.manager import ( + Configuration, + _get_bool_from_env, + _get_float_from_env, + config, +) + + +def test_get_float_from_env_with_value(): + """Test _get_float_from_env with valid environment variable.""" + with patch.dict(os.environ, {"TEST_FLOAT": "42.5"}): + assert _get_float_from_env("TEST_FLOAT", 0.0) == 42.5 + + +def test_get_float_from_env_with_default(): + """Test _get_float_from_env when variable is not set.""" + with patch.dict(os.environ, {}, clear=True): + assert _get_float_from_env("TEST_FLOAT", 10.0) == 10.0 + + +def test_get_float_from_env_with_invalid_value(): + """Test _get_float_from_env with invalid value falls back to default.""" + with patch.dict(os.environ, {"TEST_FLOAT": "not_a_number"}): + assert _get_float_from_env("TEST_FLOAT", 5.0) == 5.0 + + +def test_get_bool_from_env_true_values(): + """Test _get_bool_from_env with various true values.""" + true_values = ["1", "true", "True", "TRUE", "yes", "YES", "on", "ON"] + for val in true_values: + with patch.dict(os.environ, {"TEST_BOOL": val}): + assert _get_bool_from_env("TEST_BOOL", False) is True + + +def test_get_bool_from_env_false_values(): + """Test _get_bool_from_env with false values.""" + false_values = ["0", "false", "no", "off", "anything_else"] + for val in false_values: + with patch.dict(os.environ, {"TEST_BOOL": val}): + assert _get_bool_from_env("TEST_BOOL", True) is False + + +def test_get_bool_from_env_with_default(): + """Test _get_bool_from_env when variable is not set.""" + with patch.dict(os.environ, {}, clear=True): + assert _get_bool_from_env("TEST_BOOL", True) is True + assert _get_bool_from_env("TEST_BOOL", False) is False + + +def test_configuration_loads_from_env(): + """Test Configuration loads values from environment variables.""" + env_vars = { + "MCP_FUZZER_TIMEOUT": "60.0", + "MCP_FUZZER_LOG_LEVEL": "DEBUG", + "MCP_FUZZER_SAFETY_ENABLED": "true", + "MCP_FUZZER_FS_ROOT": "/custom/path", + "MCP_FUZZER_HTTP_TIMEOUT": "45.0", + "MCP_FUZZER_SSE_TIMEOUT": "50.0", + "MCP_FUZZER_STDIO_TIMEOUT": "55.0", + } + with patch.dict(os.environ, env_vars): + cfg = Configuration() + assert cfg.get("timeout") == 60.0 + assert cfg.get("log_level") == "DEBUG" + assert cfg.get("safety_enabled") is True + assert cfg.get("fs_root") == "/custom/path" + assert cfg.get("http_timeout") == 45.0 + assert cfg.get("sse_timeout") == 50.0 + assert cfg.get("stdio_timeout") == 55.0 + + +def test_configuration_uses_defaults(): + """Test Configuration uses defaults when env vars are not set.""" + with patch.dict(os.environ, {}, clear=True): + cfg = Configuration() + assert cfg.get("timeout") == 30.0 + assert cfg.get("log_level") == "INFO" + assert cfg.get("safety_enabled") is False + + +def test_configuration_get_with_default(): + """Test Configuration.get() with custom default.""" + cfg = Configuration() + assert cfg.get("nonexistent_key", "default_value") == "default_value" + + +def test_configuration_set(): + """Test Configuration.set() method.""" + cfg = Configuration() + cfg.set("custom_key", "custom_value") + assert cfg.get("custom_key") == "custom_value" + + +def test_configuration_update(): + """Test Configuration.update() method.""" + cfg = Configuration() + cfg.update({"key1": "value1", "key2": "value2"}) + assert cfg.get("key1") == "value1" + assert cfg.get("key2") == "value2" + + +def test_global_config_instance(): + """Test that global config instance exists and is usable.""" + assert config is not None + assert isinstance(config, Configuration) + # Should have default values + assert config.get("timeout") is not None + diff --git a/tests/unit/config/test_parser.py b/tests/unit/config/test_parser.py new file mode 100644 index 0000000..9a58984 --- /dev/null +++ b/tests/unit/config/test_parser.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +"""Unit tests for configuration parser.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import mock_open, patch + +import pytest +import yaml + +from mcp_fuzzer.config.loading.parser import load_config_file +from mcp_fuzzer.exceptions import ConfigFileError + + +def test_load_config_file_success(tmp_path): + """Test successful loading of YAML config file.""" + config_path = tmp_path / "test.yaml" + config_path.write_text("timeout: 30.0\nlog_level: DEBUG") + + result = load_config_file(str(config_path)) + assert result["timeout"] == 30.0 + assert result["log_level"] == "DEBUG" + + +def test_load_config_file_not_found(): + """Test that missing file raises ConfigFileError.""" + with pytest.raises(ConfigFileError, match="Configuration file not found"): + load_config_file("/nonexistent/path.yaml") + + +def test_load_config_file_invalid_extension(tmp_path): + """Test that non-YAML files raise ConfigFileError.""" + txt_file = tmp_path / "config.txt" + txt_file.write_text("timeout: 30") + with pytest.raises(ConfigFileError, match="Unsupported configuration file format"): + load_config_file(str(txt_file)) + + +def test_load_config_file_invalid_yaml(tmp_path): + """Test that invalid YAML raises ConfigFileError with proper message.""" + invalid_yaml = tmp_path / "invalid.yaml" + invalid_yaml.write_text("timeout: [1,") # Invalid YAML syntax + + with pytest.raises(ConfigFileError, match="Error parsing YAML"): + load_config_file(str(invalid_yaml)) + + +def test_load_config_file_permission_error(tmp_path): + """Test that permission errors raise ConfigFileError.""" + config_path = tmp_path / "test.yaml" + config_path.write_text("timeout: 30") + + with patch("builtins.open", side_effect=PermissionError("Access denied")): + with pytest.raises(ConfigFileError, match="Permission denied"): + load_config_file(str(config_path)) + + +def test_load_config_file_exception_formatting(): + """Test exceptions formatted without redundant str() conversion.""" + config_path = "/test/path.yaml" + + # Mock file exists check + with patch("os.path.isfile", return_value=True): + # Mock YAML parsing error + yaml_error = yaml.YAMLError("YAML syntax error") + with patch("builtins.open", mock_open(read_data="invalid: yaml: [")): + with patch("yaml.safe_load", side_effect=yaml_error): + with pytest.raises(ConfigFileError) as exc_info: + load_config_file(config_path) + # Exception message should contain the error without redundant str() + assert "YAML syntax error" in str(exc_info.value) + assert config_path in str(exc_info.value) + + +def test_load_config_file_empty_file(tmp_path): + """Test that empty file returns empty dict.""" + empty_file = tmp_path / "empty.yaml" + empty_file.write_text("") + + result = load_config_file(str(empty_file)) + assert result == {} + + +def test_load_config_file_none_yaml_result(tmp_path): + """Test that YAML file with only comments returns empty dict.""" + comment_file = tmp_path / "comments.yaml" + comment_file.write_text("# Just a comment\n# Another comment") + + result = load_config_file(str(comment_file)) + assert result == {} + diff --git a/tests/unit/config/test_schema_builders.py b/tests/unit/config/test_schema_builders.py new file mode 100644 index 0000000..feebd55 --- /dev/null +++ b/tests/unit/config/test_schema_builders.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +"""Unit tests for schema builder functions.""" + +from __future__ import annotations + +import pytest + +from mcp_fuzzer.config.schema.builders import ( + build_auth_schema, + build_basic_schema, + build_custom_transports_schema, + build_fuzzing_schema, + build_network_schema, + build_output_schema, + build_safety_schema, + build_timeout_schema, +) +from mcp_fuzzer.config.schema.composer import get_config_schema + + +def test_build_timeout_schema(): + """Test that timeout schema includes all timeout-related fields.""" + schema = build_timeout_schema() + assert "timeout" in schema + assert "tool_timeout" in schema + assert "http_timeout" in schema + assert "sse_timeout" in schema + assert "stdio_timeout" in schema + assert all(field["type"] == "number" for field in schema.values()) + + +def testbuild_basic_schema(): + """Test that basic schema includes log level, safety, and fs_root.""" + schema = build_basic_schema() + assert "log_level" in schema + assert "safety_enabled" in schema + assert "fs_root" in schema + assert schema["log_level"]["type"] == "string" + assert schema["safety_enabled"]["type"] == "boolean" + + +def testbuild_fuzzing_schema(): + """Test that fuzzing schema includes mode, phase, protocol, etc.""" + schema = build_fuzzing_schema() + assert "mode" in schema + assert "phase" in schema + assert "protocol" in schema + assert "endpoint" in schema + assert "runs" in schema + assert "runs_per_type" in schema + assert "max_concurrency" in schema + + +def testbuild_network_schema(): + """Test that network schema includes network-related fields.""" + schema = build_network_schema() + assert "no_network" in schema + assert "allow_hosts" in schema + assert schema["no_network"]["type"] == "boolean" + assert schema["allow_hosts"]["type"] == "array" + + +def testbuild_auth_schema(): + """Test that auth schema includes auth configuration.""" + schema = build_auth_schema() + assert "auth" in schema + auth_schema = schema["auth"] + assert auth_schema["type"] == "object" + assert "providers" in auth_schema["properties"] + assert "mappings" in auth_schema["properties"] + + +def testbuild_custom_transports_schema(): + """Test that custom transports schema is properly structured.""" + schema = build_custom_transports_schema() + assert "custom_transports" in schema + transport_schema = schema["custom_transports"] + assert transport_schema["type"] == "object" + assert "patternProperties" in transport_schema + assert "additionalProperties" in transport_schema + assert transport_schema["additionalProperties"] is False + + +def testbuild_safety_schema(): + """Test that safety schema includes all safety-related fields.""" + schema = build_safety_schema() + assert "safety" in schema + safety_schema = schema["safety"] + assert safety_schema["type"] == "object" + props = safety_schema["properties"] + assert "enabled" in props + assert "local_hosts" in props + assert "no_network" in props + assert "header_denylist" in props + + +def testbuild_output_schema(): + """Test that output schema includes output configuration.""" + schema = build_output_schema() + assert "output" in schema + output_schema = schema["output"] + assert output_schema["type"] == "object" + props = output_schema["properties"] + assert "format" in props + assert "directory" in props + assert "compress" in props + assert "types" in props + assert "retention" in props + + +def test_get_config_schema_composition(): + """Test that get_config_schema composes all builder functions.""" + schema = get_config_schema() + assert schema["type"] == "object" + assert "properties" in schema + + properties = schema["properties"] + # Check that all sections are included + assert "timeout" in properties + assert "log_level" in properties + assert "mode" in properties + assert "no_network" in properties + assert "auth" in properties + assert "custom_transports" in properties + assert "safety" in properties + assert "output" in properties + + +def test_get_config_schema_structure(): + """Test that the complete schema has proper JSON schema structure.""" + schema = get_config_schema() + assert isinstance(schema, dict) + assert "type" in schema + assert "properties" in schema + assert schema["type"] == "object" + assert isinstance(schema["properties"], dict) + + +def test_schema_builders_are_independent(): + """Test that schema builders can be called independently.""" + timeout = build_timeout_schema() + basic = build_basic_schema() + fuzzing = build_fuzzing_schema() + + # Should not have overlapping keys + assert not set(timeout.keys()) & set(basic.keys()) + assert not set(timeout.keys()) & set(fuzzing.keys()) + assert not set(basic.keys()) & set(fuzzing.keys()) + diff --git a/tests/unit/config/test_search_params.py b/tests/unit/config/test_search_params.py new file mode 100644 index 0000000..65c5721 --- /dev/null +++ b/tests/unit/config/test_search_params.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +"""Unit tests for ConfigSearchParams dataclass.""" + +from __future__ import annotations + +import pytest + +from mcp_fuzzer.config.loading.search_params import ConfigSearchParams + + +def test_config_search_params_defaults(): + """Test that ConfigSearchParams has sensible defaults.""" + params = ConfigSearchParams() + assert params.config_path is None + assert params.search_paths is None + assert params.file_names is None + + +def test_config_search_params_with_values(): + """Test ConfigSearchParams with provided values.""" + params = ConfigSearchParams( + config_path="/path/to/config.yaml", + search_paths=["/dir1", "/dir2"], + file_names=["config.yml", "config.yaml"], + ) + assert params.config_path == "/path/to/config.yaml" + assert params.search_paths == ["/dir1", "/dir2"] + assert params.file_names == ["config.yml", "config.yaml"] + + +def test_config_search_params_partial(): + """Test ConfigSearchParams with partial values.""" + params = ConfigSearchParams(config_path="/explicit/path.yaml") + assert params.config_path == "/explicit/path.yaml" + assert params.search_paths is None + assert params.file_names is None + + +def test_config_search_params_equality(): + """Test that ConfigSearchParams supports equality comparison.""" + params1 = ConfigSearchParams( + config_path="/path.yaml", + search_paths=["/dir1"], + file_names=["config.yaml"], + ) + params2 = ConfigSearchParams( + config_path="/path.yaml", + search_paths=["/dir1"], + file_names=["config.yaml"], + ) + params3 = ConfigSearchParams(config_path="/different.yaml") + + assert params1 == params2 + assert params1 != params3 + + +def test_config_search_params_repr(): + """Test that ConfigSearchParams has a useful string representation.""" + params = ConfigSearchParams(config_path="/test.yaml") + repr_str = repr(params) + assert "ConfigSearchParams" in repr_str + assert "/test.yaml" in repr_str +