diff --git a/README.md b/README.md index 6ba5d3b1..9fc74360 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ A Model Context Protocol (MCP) server for read-only Linux system administration, - **Remote SSH Execution**: Execute commands on remote systems via SSH with key-based authentication - **Multi-Host Management**: Connect to different remote hosts in the same session - **Comprehensive Diagnostics**: System info, services, processes, logs, network, and storage +- **Package Insights (DNF)**: Query packages, repositories, provides, groups, and modules - **Configurable Log Access**: Control which log files can be accessed via environment variables - **RHEL/systemd Focused**: Optimized for Red Hat Enterprise Linux systems diff --git a/docs/api/index.md b/docs/api/index.md index f4c4c7f8..472358f4 100644 --- a/docs/api/index.md +++ b/docs/api/index.md @@ -19,6 +19,7 @@ MCP tools organized by category: - **[Services](tools/services.md)** - Systemd service management - **[Processes](tools/processes.md)** - Process listing and details - **[Logs](tools/logs.md)** - Journal, audit, and log file access +- **[DNF](tools/dnf.md)** - Package and repository information - **[Network](tools/network.md)** - Network interfaces, connections, ports, routes - **[Storage](tools/storage.md)** - Block devices, directory and file listing diff --git a/docs/api/tools/dnf.md b/docs/api/tools/dnf.md new file mode 100644 index 00000000..b02d2ade --- /dev/null +++ b/docs/api/tools/dnf.md @@ -0,0 +1,6 @@ +# DNF Tools + +::: linux_mcp_server.tools.dnf + options: + show_root_heading: true + show_root_full_path: false diff --git a/docs/cheatsheet.md b/docs/cheatsheet.md index 06b1fdd2..825d510a 100644 --- a/docs/cheatsheet.md +++ b/docs/cheatsheet.md @@ -24,6 +24,22 @@ A quick reference guide for common tasks and the tools to use. | **Service Logs** | `get_service_logs` | "Show recent logs for sshd." | | **Specific Log File** | `read_log_file` | "Read the last 50 lines of /var/log/messages." | +## 📦 Packages (DNF) + +| I want to check... | Use this tool | Example Prompt | +|-------------------|---------------|----------------| +| **Installed Packages** | `list_dnf_installed_packages` | "List all installed packages." | +| **Available Packages** | `list_dnf_available_packages` | "What packages are available in repos?" | +| **Package Details** | `get_dnf_package_info` | "Show details for bash." | +| **Repositories** | `list_dnf_repositories` | "Which repositories are enabled?" | +| **File Provides** | `dnf_provides` | "Which package provides /usr/bin/python3?" | +| **Repository Info** | `get_dnf_repo_info` | "Show details for baseos." | +| **Group List** | `list_dnf_groups` | "List all package groups." | +| **Group Info** | `get_dnf_group_info` | "Show details for Development Tools." | +| **Group Summary** | `get_dnf_group_summary` | "Summarize installed groups." | +| **Module List** | `list_dnf_modules` | "List nodejs module streams." | +| **Module Provides** | `dnf_module_provides` | "Which module provides python3?" | + ## 🌐 Network | I want to check... | Use this tool | Example Prompt | diff --git a/docs/contributing.md b/docs/contributing.md index b8a66518..f477b340 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -79,6 +79,7 @@ pytest linux-mcp-server/ ├── src/linux_mcp_server/ │ ├── tools/ # MCP tool implementations +│ │ ├── dnf.py # DNF package manager tools │ │ ├── logs.py # Log reading tools │ │ ├── network.py # Network diagnostic tools │ │ ├── processes.py # Process management tools diff --git a/docs/usage.md b/docs/usage.md index f87ce3a7..3f7e64bb 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -189,6 +189,120 @@ Reads a specific log file (must be in the allowed list). **Security Note:** This tool respects the `LINUX_MCP_ALLOWED_LOG_PATHS` environment variable whitelist. +### Package Management (DNF) + +#### `list_dnf_installed_packages` +Lists installed packages via `dnf`. + +**Parameters:** +- `host` (string, optional): Remote host identifier +- `limit` (number, optional): Maximum number of output lines to return (default: 500) +- `offset` (number, optional): Number of output lines to skip (default: 0) +- `no_limit` (boolean, optional): Disable output truncation (default: false) + +**Example use case:** "Show me all installed packages." + +#### `list_dnf_available_packages` +Lists packages available in configured repositories. + +**Parameters:** +- `host` (string, optional): Remote host identifier +- `limit` (number, optional): Maximum number of output lines to return (default: 500) +- `offset` (number, optional): Number of output lines to skip (default: 0) +- `no_limit` (boolean, optional): Disable output truncation (default: false) + +**Example use case:** "Which packages are available from enabled repos?" + +#### `get_dnf_package_info` +Returns detailed information for a specific package. + +**Parameters:** +- `package` (string, required): Package name (e.g., "bash", "openssl") +- `host` (string, optional): Remote host identifier + +**Example use case:** "Get details for the bash package." + +#### `list_dnf_repositories` +Lists configured repositories and their status. + +**Parameters:** +- `host` (string, optional): Remote host identifier +- `limit` (number, optional): Maximum number of output lines to return (default: 500) +- `offset` (number, optional): Number of output lines to skip (default: 0) +- `no_limit` (boolean, optional): Disable output truncation (default: false) + +**Example use case:** "Show me all configured repositories and whether they are enabled." + +#### `dnf_provides` +Finds packages that provide a file or binary. + +**Parameters:** +- `query` (string, required): File path or binary name (e.g., "/usr/bin/python3", "libssl.so.3") +- `host` (string, optional): Remote host identifier + +**Example use case:** "Which package provides /usr/bin/python3?" + +#### `get_dnf_repo_info` +Shows detailed information for a specific repository. + +**Parameters:** +- `repo_id` (string, required): Repository id (e.g., "baseos", "appstream") +- `host` (string, optional): Remote host identifier + +**Example use case:** "Show details for the baseos repository." + +#### `list_dnf_groups` +Lists available and installed package groups. + +**Parameters:** +- `host` (string, optional): Remote host identifier +- `limit` (number, optional): Maximum number of output lines to return (default: 500) +- `offset` (number, optional): Number of output lines to skip (default: 0) +- `no_limit` (boolean, optional): Disable output truncation (default: false) + +**Example use case:** "List all package groups." + +#### `get_dnf_group_info` +Shows details for a specific package group. + +**Parameters:** +- `group` (string, required): Group name (e.g., "Development Tools") +- `host` (string, optional): Remote host identifier + +**Example use case:** "Show details for the Development Tools group." + +#### `get_dnf_group_summary` +Shows a summary of installed and available groups. + +**Parameters:** +- `host` (string, optional): Remote host identifier +- `limit` (number, optional): Maximum number of output lines to return (default: 500) +- `offset` (number, optional): Number of output lines to skip (default: 0) +- `no_limit` (boolean, optional): Disable output truncation (default: false) + +**Example use case:** "Summarize installed and available groups." + +#### `list_dnf_modules` +Lists modules (optionally filtered by module name). + +**Parameters:** +- `module` (string, optional): Module name filter (e.g., "nodejs") +- `host` (string, optional): Remote host identifier +- `limit` (number, optional): Maximum number of output lines to return (default: 500) +- `offset` (number, optional): Number of output lines to skip (default: 0) +- `no_limit` (boolean, optional): Disable output truncation (default: false) + +**Example use case:** "List available nodejs module streams." + +#### `dnf_module_provides` +Shows modules that provide a specific package. + +**Parameters:** +- `package` (string, required): Package name (e.g., "python3") +- `host` (string, optional): Remote host identifier + +**Example use case:** "Which module provides python3?" + ### Network Diagnostics #### `get_network_interfaces` diff --git a/src/linux_mcp_server/commands.py b/src/linux_mcp_server/commands.py index f6755a9b..0b2048a8 100644 --- a/src/linux_mcp_server/commands.py +++ b/src/linux_mcp_server/commands.py @@ -239,6 +239,62 @@ class CommandGroup(BaseModel): "default": CommandSpec(args=("cat", "{path}")), } ), + # === Packages (dnf) === + "dnf_list_installed": CommandGroup( + commands={ + "default": CommandSpec(args=("dnf", "list", "installed")), + } + ), + "dnf_list_available": CommandGroup( + commands={ + "default": CommandSpec(args=("dnf", "list", "available")), + } + ), + "dnf_package_info": CommandGroup( + commands={ + "default": CommandSpec(args=("dnf", "info", "{package}")), + } + ), + "dnf_repolist": CommandGroup( + commands={ + "default": CommandSpec(args=("dnf", "repolist", "--all")), + } + ), + "dnf_provides": CommandGroup( + commands={ + "default": CommandSpec(args=("dnf", "provides", "{query}")), + } + ), + "dnf_repo_info": CommandGroup( + commands={ + "default": CommandSpec(args=("dnf", "repoinfo", "{repo_id}")), + } + ), + "dnf_group_list": CommandGroup( + commands={ + "default": CommandSpec(args=("dnf", "group", "list")), + } + ), + "dnf_group_info": CommandGroup( + commands={ + "default": CommandSpec(args=("dnf", "group", "info", "{group}")), + } + ), + "dnf_group_summary": CommandGroup( + commands={ + "default": CommandSpec(args=("dnf", "group", "summary")), + } + ), + "dnf_module_list": CommandGroup( + commands={ + "default": CommandSpec(args=("dnf", "module", "list"), optional_flags={"module": ("{module}",)}), + } + ), + "dnf_module_provides": CommandGroup( + commands={ + "default": CommandSpec(args=("dnf", "module", "provides", "{package}")), + } + ), # === System Info === "system_info": CommandGroup( commands={ diff --git a/src/linux_mcp_server/tools/__init__.py b/src/linux_mcp_server/tools/__init__.py index 972c15cc..952b5a24 100644 --- a/src/linux_mcp_server/tools/__init__.py +++ b/src/linux_mcp_server/tools/__init__.py @@ -1,3 +1,16 @@ +# packages (dnf) +from linux_mcp_server.tools.dnf import dnf_module_provides +from linux_mcp_server.tools.dnf import dnf_provides +from linux_mcp_server.tools.dnf import get_dnf_group_info +from linux_mcp_server.tools.dnf import get_dnf_group_summary +from linux_mcp_server.tools.dnf import get_dnf_package_info +from linux_mcp_server.tools.dnf import get_dnf_repo_info +from linux_mcp_server.tools.dnf import list_dnf_available_packages +from linux_mcp_server.tools.dnf import list_dnf_groups +from linux_mcp_server.tools.dnf import list_dnf_installed_packages +from linux_mcp_server.tools.dnf import list_dnf_modules +from linux_mcp_server.tools.dnf import list_dnf_repositories + # logs from linux_mcp_server.tools.logs import get_journal_logs from linux_mcp_server.tools.logs import read_log_file @@ -45,11 +58,22 @@ "get_service_logs", "get_service_status", "get_system_information", + "get_dnf_package_info", + "get_dnf_group_info", + "get_dnf_group_summary", + "get_dnf_repo_info", "list_block_devices", "list_directories", + "list_dnf_available_packages", + "list_dnf_installed_packages", + "list_dnf_groups", + "list_dnf_modules", + "list_dnf_repositories", "list_files", "list_processes", "list_services", + "dnf_module_provides", + "dnf_provides", "read_file", "read_log_file", ] diff --git a/src/linux_mcp_server/tools/dnf.py b/src/linux_mcp_server/tools/dnf.py new file mode 100644 index 00000000..09ef3023 --- /dev/null +++ b/src/linux_mcp_server/tools/dnf.py @@ -0,0 +1,485 @@ +"""DNF package manager tools.""" + +import typing as t + +from mcp.types import ToolAnnotations +from pydantic import Field +from pydantic.functional_validators import BeforeValidator + +from linux_mcp_server.audit import log_tool_call +from linux_mcp_server.commands import get_command +from linux_mcp_server.server import mcp +from linux_mcp_server.utils.decorators import disallow_local_execution_in_containers +from linux_mcp_server.utils.types import Host +from linux_mcp_server.utils.validation import is_empty_output +from linux_mcp_server.utils.validation import validate_dnf_group_name +from linux_mcp_server.utils.validation import validate_dnf_package_name +from linux_mcp_server.utils.validation import validate_dnf_provides_query +from linux_mcp_server.utils.validation import validate_dnf_repo_id +from linux_mcp_server.utils.validation import validate_optional_dnf_module_name + + +DEFAULT_DNF_LIMIT = 500 + + +def _is_package_not_found(stdout: str, stderr: str) -> bool: + combined = f"{stdout}\n{stderr}".casefold() + return "no matching packages to list" in combined or "no match for argument" in combined + + +def _matches_any_message(stdout: str, stderr: str, patterns: t.Sequence[str]) -> bool: + combined = f"{stdout}\n{stderr}".casefold() + return any(pattern in combined for pattern in patterns) + + +def _apply_output_limits(stdout: str, limit: int | None, offset: int, no_limit: bool) -> str: + lines = stdout.splitlines() + total_lines = len(lines) + + if no_limit or limit is None: + if offset <= 0: + return stdout + + sliced = lines[offset:] + if not sliced: + return "No output after applying limit/offset." + + return "\n".join(sliced) + + start = offset + end = offset + limit + sliced = lines[start:end] + + if not sliced: + return "No output after applying limit/offset." + + result = "\n".join(sliced) + if total_lines > end: + result = f"{result}\n... output truncated: showing {len(sliced)} of {total_lines} lines" + + return result + + +async def _run_dnf_command( + command_name: str, + host: Host | None = None, + limit: int | None = DEFAULT_DNF_LIMIT, + offset: int = 0, + no_limit: bool = False, + **kwargs: object, +) -> str: + cmd = get_command(command_name) + returncode, stdout, stderr = await cmd.run(host=host, **kwargs) + + if returncode != 0: + return f"Error running dnf: {stderr}" + + if is_empty_output(stdout): + return "No output returned by dnf." + + return _apply_output_limits(stdout, limit=limit, offset=offset, no_limit=no_limit) + + +@mcp.tool( + title="List installed packages (dnf)", + description="List installed packages via dnf.", + tags={"packages", "dnf", "troubleshooting"}, + annotations=ToolAnnotations(readOnlyHint=True), +) +@log_tool_call +@disallow_local_execution_in_containers +async def list_dnf_installed_packages( + limit: t.Annotated[ + int, + Field( + description="Maximum number of output lines to return", + gt=0, + examples=[DEFAULT_DNF_LIMIT], + ), + ] = DEFAULT_DNF_LIMIT, + offset: t.Annotated[ + int, + Field( + description="Number of output lines to skip", + ge=0, + examples=[0], + ), + ] = 0, + no_limit: t.Annotated[ + bool, + Field( + description="Disable output truncation", + examples=[False], + ), + ] = False, + host: Host = None, +) -> str: + """List installed packages using dnf.""" + return await _run_dnf_command("dnf_list_installed", host=host, limit=limit, offset=offset, no_limit=no_limit) + + +@mcp.tool( + title="List available packages (dnf)", + description="List available packages via dnf.", + tags={"packages", "dnf", "troubleshooting"}, + annotations=ToolAnnotations(readOnlyHint=True), +) +@log_tool_call +@disallow_local_execution_in_containers +async def list_dnf_available_packages( + limit: t.Annotated[ + int, + Field( + description="Maximum number of output lines to return", + gt=0, + examples=[DEFAULT_DNF_LIMIT], + ), + ] = DEFAULT_DNF_LIMIT, + offset: t.Annotated[ + int, + Field( + description="Number of output lines to skip", + ge=0, + examples=[0], + ), + ] = 0, + no_limit: t.Annotated[ + bool, + Field( + description="Disable output truncation", + examples=[False], + ), + ] = False, + host: Host = None, +) -> str: + """List available packages using dnf.""" + return await _run_dnf_command("dnf_list_available", host=host, limit=limit, offset=offset, no_limit=no_limit) + + +@mcp.tool( + title="Package info (dnf)", + description="Get details for a specific package via dnf.", + tags={"packages", "dnf", "troubleshooting"}, + annotations=ToolAnnotations(readOnlyHint=True), +) +@log_tool_call +@disallow_local_execution_in_containers +async def get_dnf_package_info( + package: t.Annotated[ + str, + BeforeValidator(validate_dnf_package_name), + Field(description="Package name", examples=["bash", "openssl", "vim-enhanced", "python3"]), + ], + host: Host = None, +) -> str: + """Get package details using dnf.""" + cmd = get_command("dnf_package_info") + returncode, stdout, stderr = await cmd.run(host=host, package=package) + + if _is_package_not_found(stdout, stderr): + return f"Package '{package}' not found." + + if returncode != 0: + return f"Error running dnf: {stderr}" + + if is_empty_output(stdout): + return "No output returned by dnf." + + return stdout + + +@mcp.tool( + title="List repositories (dnf)", + description="List configured repositories via dnf.", + tags={"packages", "dnf", "troubleshooting"}, + annotations=ToolAnnotations(readOnlyHint=True), +) +@log_tool_call +@disallow_local_execution_in_containers +async def list_dnf_repositories( + limit: t.Annotated[ + int, + Field( + description="Maximum number of output lines to return", + gt=0, + examples=[DEFAULT_DNF_LIMIT], + ), + ] = DEFAULT_DNF_LIMIT, + offset: t.Annotated[ + int, + Field( + description="Number of output lines to skip", + ge=0, + examples=[0], + ), + ] = 0, + no_limit: t.Annotated[ + bool, + Field( + description="Disable output truncation", + examples=[False], + ), + ] = False, + host: Host = None, +) -> str: + """List configured repositories using dnf.""" + return await _run_dnf_command("dnf_repolist", host=host, limit=limit, offset=offset, no_limit=no_limit) + + +@mcp.tool( + title="Find packages providing a file (dnf)", + description="Find packages that provide a specific file or binary via dnf.", + tags={"packages", "dnf", "troubleshooting"}, + annotations=ToolAnnotations(readOnlyHint=True), +) +@log_tool_call +@disallow_local_execution_in_containers +async def dnf_provides( + query: t.Annotated[ + str, + BeforeValidator(validate_dnf_provides_query), + Field(description="File path or binary name", examples=["/usr/bin/python3", "libssl.so.3", "*/libssl.so.*"]), + ], + host: Host = None, +) -> str: + """Find packages providing a file or binary using dnf.""" + cmd = get_command("dnf_provides") + returncode, stdout, stderr = await cmd.run(host=host, query=query) + + if _matches_any_message(stdout, stderr, ("no matches found", "no match for argument")): + return f"No packages provide '{query}'." + + if returncode != 0: + return f"Error running dnf: {stderr}" + + if is_empty_output(stdout): + return "No output returned by dnf." + + return stdout + + +@mcp.tool( + title="Repository info (dnf)", + description="Get detailed information for a specific repository via dnf.", + tags={"packages", "dnf", "troubleshooting"}, + annotations=ToolAnnotations(readOnlyHint=True), +) +@log_tool_call +@disallow_local_execution_in_containers +async def get_dnf_repo_info( + repo_id: t.Annotated[ + str, + BeforeValidator(validate_dnf_repo_id), + Field(description="Repository id", examples=["baseos", "appstream"]), + ], + host: Host = None, +) -> str: + """Get repository details using dnf.""" + cmd = get_command("dnf_repo_info") + returncode, stdout, stderr = await cmd.run(host=host, repo_id=repo_id) + + if _matches_any_message(stdout, stderr, ("no matching repo", "no repository match", "no such repository")): + return f"Repository '{repo_id}' not found." + + if returncode != 0: + return f"Error running dnf: {stderr}" + + if is_empty_output(stdout): + return "No output returned by dnf." + + return stdout + + +@mcp.tool( + title="List groups (dnf)", + description="List available and installed groups via dnf.", + tags={"packages", "dnf", "troubleshooting"}, + annotations=ToolAnnotations(readOnlyHint=True), +) +@log_tool_call +@disallow_local_execution_in_containers +async def list_dnf_groups( + limit: t.Annotated[ + int, + Field( + description="Maximum number of output lines to return", + gt=0, + examples=[DEFAULT_DNF_LIMIT], + ), + ] = DEFAULT_DNF_LIMIT, + offset: t.Annotated[ + int, + Field( + description="Number of output lines to skip", + ge=0, + examples=[0], + ), + ] = 0, + no_limit: t.Annotated[ + bool, + Field( + description="Disable output truncation", + examples=[False], + ), + ] = False, + host: Host = None, +) -> str: + """List group information using dnf.""" + return await _run_dnf_command("dnf_group_list", host=host, limit=limit, offset=offset, no_limit=no_limit) + + +@mcp.tool( + title="Group info (dnf)", + description="Get details for a specific group via dnf.", + tags={"packages", "dnf", "troubleshooting"}, + annotations=ToolAnnotations(readOnlyHint=True), +) +@log_tool_call +@disallow_local_execution_in_containers +async def get_dnf_group_info( + group: t.Annotated[ + str, + BeforeValidator(validate_dnf_group_name), + Field(description="Group name", examples=["Development Tools", "Server with GUI"]), + ], + host: Host = None, +) -> str: + """Get group details using dnf.""" + cmd = get_command("dnf_group_info") + returncode, stdout, stderr = await cmd.run(host=host, group=group) + + if _matches_any_message(stdout, stderr, ("no groups matched", "no match for argument")): + return f"Group '{group}' not found." + + if returncode != 0: + return f"Error running dnf: {stderr}" + + if is_empty_output(stdout): + return "No output returned by dnf." + + return stdout + + +@mcp.tool( + title="Group summary (dnf)", + description="Show a summary of installed and available groups via dnf.", + tags={"packages", "dnf", "troubleshooting"}, + annotations=ToolAnnotations(readOnlyHint=True), +) +@log_tool_call +@disallow_local_execution_in_containers +async def get_dnf_group_summary( + limit: t.Annotated[ + int, + Field( + description="Maximum number of output lines to return", + gt=0, + examples=[DEFAULT_DNF_LIMIT], + ), + ] = DEFAULT_DNF_LIMIT, + offset: t.Annotated[ + int, + Field( + description="Number of output lines to skip", + ge=0, + examples=[0], + ), + ] = 0, + no_limit: t.Annotated[ + bool, + Field( + description="Disable output truncation", + examples=[False], + ), + ] = False, + host: Host = None, +) -> str: + """Get group summary using dnf.""" + return await _run_dnf_command("dnf_group_summary", host=host, limit=limit, offset=offset, no_limit=no_limit) + + +@mcp.tool( + title="List modules (dnf)", + description="List modules or filter by module name via dnf.", + tags={"packages", "dnf", "troubleshooting"}, + annotations=ToolAnnotations(readOnlyHint=True), +) +@log_tool_call +@disallow_local_execution_in_containers +async def list_dnf_modules( + module: t.Annotated[ + str | None, + BeforeValidator(validate_optional_dnf_module_name), + Field(description="Optional module name filter", examples=["nodejs", "python39"]), + ] = None, + limit: t.Annotated[ + int, + Field( + description="Maximum number of output lines to return", + gt=0, + examples=[DEFAULT_DNF_LIMIT], + ), + ] = DEFAULT_DNF_LIMIT, + offset: t.Annotated[ + int, + Field( + description="Number of output lines to skip", + ge=0, + examples=[0], + ), + ] = 0, + no_limit: t.Annotated[ + bool, + Field( + description="Disable output truncation", + examples=[False], + ), + ] = False, + host: Host = None, +) -> str: + """List modules using dnf.""" + cmd = get_command("dnf_module_list") + returncode, stdout, stderr = await cmd.run(host=host, module=module) + + if module and _matches_any_message(stdout, stderr, ("no matching modules to list", "no match for argument")): + return f"No modules matched '{module}'." + + if returncode != 0: + return f"Error running dnf: {stderr}" + + if is_empty_output(stdout): + return "No output returned by dnf." + + return _apply_output_limits(stdout, limit=limit, offset=offset, no_limit=no_limit) + + +@mcp.tool( + title="Module provides (dnf)", + description="Find modules that provide a specific package via dnf.", + tags={"packages", "dnf", "troubleshooting"}, + annotations=ToolAnnotations(readOnlyHint=True), +) +@log_tool_call +@disallow_local_execution_in_containers +async def dnf_module_provides( + package: t.Annotated[ + str, + BeforeValidator(validate_dnf_package_name), + Field(description="Package name", examples=["python3", "nodejs"]), + ], + host: Host = None, +) -> str: + """Find modules providing a package using dnf.""" + cmd = get_command("dnf_module_provides") + returncode, stdout, stderr = await cmd.run(host=host, package=package) + + if _matches_any_message(stdout, stderr, ("no matching modules", "no match for argument")): + return f"No modules provide '{package}'." + + if returncode != 0: + return f"Error running dnf: {stderr}" + + if is_empty_output(stdout): + return "No output returned by dnf." + + return stdout diff --git a/src/linux_mcp_server/utils/validation.py b/src/linux_mcp_server/utils/validation.py index 9efde7fa..c72bc5bb 100644 --- a/src/linux_mcp_server/utils/validation.py +++ b/src/linux_mcp_server/utils/validation.py @@ -1,3 +1,5 @@ +import re + from pathlib import Path @@ -56,6 +58,131 @@ def validate_path(path: str) -> Path: return Path(path) +def validate_dnf_package_name(value: str) -> str: + """Validate a dnf package identifier for safety. + + Allows a conservative RPM token charset (letters, digits, . _ + : -). + Rejects empty values, whitespace/control characters, leading '-' and slashes. + """ + if not value: + raise ValueError("Package name cannot be empty") + + if any(c in value for c in ["\n", "\r", "\x00", "\t", " "]): + raise ValueError("Package name contains invalid characters") + + if value.startswith("-"): + raise ValueError("Package name cannot start with '-'") + + if "/" in value: + raise ValueError("Package name cannot contain '/'") + + if not re.fullmatch(r"[A-Za-z0-9._+:-]+", value): + raise ValueError("Package name contains invalid characters") + + return value + + +def validate_dnf_repo_id(value: str) -> str: + """Validate a dnf repository identifier for safety. + + Allows a conservative charset (letters, digits, . _ + : -). + Rejects empty values, whitespace/control characters, leading '-' and slashes. + """ + if not value: + raise ValueError("Repository id cannot be empty") + + if any(c in value for c in ["\n", "\r", "\x00", "\t", " "]): + raise ValueError("Repository id contains invalid characters") + + if value.startswith("-"): + raise ValueError("Repository id cannot start with '-'") + + if "/" in value: + raise ValueError("Repository id cannot contain '/'") + + if not re.fullmatch(r"[A-Za-z0-9._+:-]+", value): + raise ValueError("Repository id contains invalid characters") + + return value + + +def validate_dnf_group_name(value: str) -> str: + """Validate a dnf group name for safety. + + Allows letters, digits, spaces and common punctuation used in group names. + Rejects empty values, control characters, and leading '-'. + """ + if not value: + raise ValueError("Group name cannot be empty") + + if any(c in value for c in ["\n", "\r", "\x00", "\t"]): + raise ValueError("Group name contains invalid characters") + + if value.startswith("-"): + raise ValueError("Group name cannot start with '-'") + + if not re.fullmatch(r"[A-Za-z0-9 ._+:'()&,-]+", value): + raise ValueError("Group name contains invalid characters") + + return value + + +def validate_dnf_module_name(value: str) -> str: + """Validate a dnf module name for safety. + + Allows a conservative charset (letters, digits, . _ + : -). + Rejects empty values, whitespace/control characters, leading '-' and slashes. + """ + if not value: + raise ValueError("Module name cannot be empty") + + if any(c in value for c in ["\n", "\r", "\x00", "\t", " "]): + raise ValueError("Module name contains invalid characters") + + if value.startswith("-"): + raise ValueError("Module name cannot start with '-'") + + if "/" in value: + raise ValueError("Module name cannot contain '/'") + + if not re.fullmatch(r"[A-Za-z0-9._+:-]+", value): + raise ValueError("Module name contains invalid characters") + + return value + + +def validate_optional_dnf_module_name(value: str | None) -> str | None: + """Validate an optional dnf module name.""" + if value is None: + return None + + return validate_dnf_module_name(value) + + +def validate_dnf_provides_query(value: str) -> str: + """Validate a dnf provides query for safety. + + Accepts file paths or binary names with optional glob wildcards (*, ?). + Rejects empty values, whitespace/control characters, and leading '-'. + """ + if not value: + raise ValueError("Provides query cannot be empty") + + if any(c in value for c in ["\n", "\r", "\x00", "\t", " "]): + raise ValueError("Provides query contains invalid characters") + + if value.startswith("-"): + raise ValueError("Provides query cannot start with '-'") + + if ".." in value: + raise ValueError("Provides query cannot contain '..'") + + if not re.fullmatch(r"[A-Za-z0-9._+:/@*?-]+", value): + raise ValueError("Provides query contains invalid characters") + + return value + + def is_empty_output(stdout: str | None) -> bool: """Check if command output is empty or whitespace-only. diff --git a/tests/tools/test_dnf.py b/tests/tools/test_dnf.py new file mode 100644 index 00000000..3c2004fa --- /dev/null +++ b/tests/tools/test_dnf.py @@ -0,0 +1,543 @@ +"""Tests for dnf package manager tools.""" + +import pytest + +from linux_mcp_server.utils.validation import validate_dnf_group_name +from linux_mcp_server.utils.validation import validate_dnf_module_name +from linux_mcp_server.utils.validation import validate_dnf_package_name +from linux_mcp_server.utils.validation import validate_dnf_provides_query +from linux_mcp_server.utils.validation import validate_dnf_repo_id +from linux_mcp_server.utils.validation import validate_optional_dnf_module_name + + +class TestDnfValidation: + @pytest.mark.parametrize( + ("value", "expected"), + [ + ("bash", "bash"), + ("openssl-libs", "openssl-libs"), + ("python3.12", "python3.12"), + ("glibc:2.28", "glibc:2.28"), + ], + ) + def test_validate_dnf_package_name_valid(self, value, expected): + assert validate_dnf_package_name(value) == expected + + @pytest.mark.parametrize( + "value", + [ + "", + " ", + "bad name", + "bad\tname", + "bad\nname", + "-bad", + "bad/name", + "bad*name", + "bad?name", + ], + ) + def test_validate_dnf_package_name_invalid(self, value): + with pytest.raises(ValueError): + validate_dnf_package_name(value) + + @pytest.mark.parametrize( + ("value", "expected"), + [ + ("baseos", "baseos"), + ("appstream", "appstream"), + ("custom-repo", "custom-repo"), + ("repo:1", "repo:1"), + ], + ) + def test_validate_dnf_repo_id_valid(self, value, expected): + assert validate_dnf_repo_id(value) == expected + + @pytest.mark.parametrize( + "value", + [ + "", + " ", + "bad repo", + "bad\trepo", + "bad\nrepo", + "-bad", + "bad/repo", + "bad*repo", + ], + ) + def test_validate_dnf_repo_id_invalid(self, value): + with pytest.raises(ValueError): + validate_dnf_repo_id(value) + + @pytest.mark.parametrize( + ("value", "expected"), + [ + ("Development Tools", "Development Tools"), + ("Server with GUI", "Server with GUI"), + ("Container Management", "Container Management"), + ("Workstation & GUI", "Workstation & GUI"), + ], + ) + def test_validate_dnf_group_name_valid(self, value, expected): + assert validate_dnf_group_name(value) == expected + + @pytest.mark.parametrize( + "value", + [ + "", + "\t", + "bad\tname", + "bad\nname", + "-bad", + "bad/name", + "bad*name", + ], + ) + def test_validate_dnf_group_name_invalid(self, value): + with pytest.raises(ValueError): + validate_dnf_group_name(value) + + @pytest.mark.parametrize( + ("value", "expected"), + [ + ("nodejs", "nodejs"), + ("python39", "python39"), + ("nodejs:18", "nodejs:18"), + ], + ) + def test_validate_dnf_module_name_valid(self, value, expected): + assert validate_dnf_module_name(value) == expected + + @pytest.mark.parametrize( + "value", + [ + "", + " ", + "bad name", + "-bad", + "bad/name", + "bad*name", + ], + ) + def test_validate_dnf_module_name_invalid(self, value): + with pytest.raises(ValueError): + validate_dnf_module_name(value) + + def test_validate_optional_dnf_module_name_none(self): + assert validate_optional_dnf_module_name(None) is None + + @pytest.mark.parametrize( + ("value", "expected"), + [ + ("/usr/bin/python3", "/usr/bin/python3"), + ("libssl.so.3", "libssl.so.3"), + ("*/libssl.so.*", "*/libssl.so.*"), + ("usr/lib64/libcrypto.so.3", "usr/lib64/libcrypto.so.3"), + ], + ) + def test_validate_dnf_provides_query_valid(self, value, expected): + assert validate_dnf_provides_query(value) == expected + + @pytest.mark.parametrize( + "value", + [ + "", + " ", + "bad name", + "bad\tname", + "bad\nname", + "-bad", + "../usr/bin/bash", + "bad|name", + ], + ) + def test_validate_dnf_provides_query_invalid(self, value): + with pytest.raises(ValueError): + validate_dnf_provides_query(value) + + +class TestDnfToolsRemote: + @pytest.mark.parametrize( + "tool_name", + [ + "list_dnf_installed_packages", + "list_dnf_available_packages", + "list_dnf_repositories", + "list_dnf_groups", + "get_dnf_group_summary", + ], + ) + async def test_dnf_list_tools_success(self, mcp_client, mock_execute_with_fallback, tool_name): + mock_execute_with_fallback.return_value = (0, "Some dnf output", "") + + result = await mcp_client.call_tool( + tool_name, + arguments={"host": "remote.example.com"}, + ) + result_text = result.content[0].text + + assert "Some dnf output" in result_text + call_kwargs = mock_execute_with_fallback.call_args[1] + assert call_kwargs["host"] == "remote.example.com" + + async def test_dnf_list_tools_empty_output(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, " ", "") + + result = await mcp_client.call_tool( + "list_dnf_installed_packages", + arguments={"host": "remote.example.com"}, + ) + result_text = result.content[0].text.casefold() + + assert "no output returned" in result_text + + async def test_dnf_list_tools_error(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (1, "", "dnf error") + + result = await mcp_client.call_tool( + "list_dnf_available_packages", + arguments={"host": "remote.example.com"}, + ) + result_text = result.content[0].text.casefold() + + assert "error running dnf" in result_text + assert "dnf error" in result_text + + async def test_dnf_list_tools_truncates_output(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, "line1\nline2\nline3\nline4", "") + + result = await mcp_client.call_tool( + "list_dnf_installed_packages", + arguments={"host": "remote.example.com", "limit": 2, "offset": 1}, + ) + result_text = result.content[0].text + + assert result_text.startswith("line2\nline3") + assert "output truncated" in result_text + + async def test_dnf_list_tools_no_limit(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, "line1\nline2\nline3", "") + + result = await mcp_client.call_tool( + "list_dnf_available_packages", + arguments={"host": "remote.example.com", "limit": 1, "no_limit": True}, + ) + result_text = result.content[0].text + + assert result_text == "line1\nline2\nline3" + + async def test_dnf_list_tools_no_limit_offset_out_of_range(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, "line1\nline2", "") + + result = await mcp_client.call_tool( + "list_dnf_installed_packages", + arguments={"host": "remote.example.com", "no_limit": True, "offset": 10}, + ) + result_text = result.content[0].text.casefold() + + assert "no output after applying limit/offset" in result_text + + async def test_dnf_list_tools_no_limit_offset_slices(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, "line1\nline2\nline3", "") + + result = await mcp_client.call_tool( + "list_dnf_installed_packages", + arguments={"host": "remote.example.com", "no_limit": True, "offset": 1}, + ) + result_text = result.content[0].text + + assert result_text == "line2\nline3" + + async def test_dnf_list_tools_offset_out_of_range(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, "line1\nline2", "") + + result = await mcp_client.call_tool( + "list_dnf_repositories", + arguments={"host": "remote.example.com", "limit": 5, "offset": 10}, + ) + result_text = result.content[0].text.casefold() + + assert "no output after applying limit/offset" in result_text + + async def test_dnf_list_tools_invalid_limit(self, mcp_client, mock_execute_with_fallback): + with pytest.raises(Exception) as exc_info: + await mcp_client.call_tool( + "list_dnf_installed_packages", + arguments={"host": "remote.example.com", "limit": 0}, + ) + + assert "validation" in str(exc_info.value).casefold() or "invalid" in str(exc_info.value).casefold() + mock_execute_with_fallback.assert_not_called() + + async def test_dnf_list_tools_invalid_offset(self, mcp_client, mock_execute_with_fallback): + with pytest.raises(Exception) as exc_info: + await mcp_client.call_tool( + "list_dnf_available_packages", + arguments={"host": "remote.example.com", "offset": -1}, + ) + + assert "validation" in str(exc_info.value).casefold() or "invalid" in str(exc_info.value).casefold() + mock_execute_with_fallback.assert_not_called() + + async def test_get_dnf_package_info_success(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, "Name : bash", "") + + result = await mcp_client.call_tool( + "get_dnf_package_info", + arguments={"package": "bash", "host": "remote.example.com"}, + ) + result_text = result.content[0].text + + assert "Name : bash" in result_text + + async def test_get_dnf_package_info_not_found(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (1, "", "No match for argument: missing") + + result = await mcp_client.call_tool( + "get_dnf_package_info", + arguments={"package": "missing", "host": "remote.example.com"}, + ) + result_text = result.content[0].text.casefold() + + assert "not found" in result_text + + async def test_get_dnf_package_info_error(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (1, "", "dnf error") + + result = await mcp_client.call_tool( + "get_dnf_package_info", + arguments={"package": "bash", "host": "remote.example.com"}, + ) + result_text = result.content[0].text.casefold() + + assert "error running dnf" in result_text + + async def test_get_dnf_package_info_empty_output(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, " ", "") + + result = await mcp_client.call_tool( + "get_dnf_package_info", + arguments={"package": "bash", "host": "remote.example.com"}, + ) + result_text = result.content[0].text.casefold() + + assert "no output returned" in result_text + + async def test_get_dnf_package_info_invalid_package_name(self, mcp_client, mock_execute_with_fallback): + with pytest.raises(Exception) as exc_info: + await mcp_client.call_tool( + "get_dnf_package_info", + arguments={"package": "bad name", "host": "remote.example.com"}, + ) + + assert "validation" in str(exc_info.value).casefold() or "invalid" in str(exc_info.value).casefold() + mock_execute_with_fallback.assert_not_called() + + async def test_dnf_provides_success(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, "file provided by", "") + + result = await mcp_client.call_tool( + "dnf_provides", + arguments={"query": "/usr/bin/python3", "host": "remote.example.com"}, + ) + result_text = result.content[0].text + + assert "file provided by" in result_text + + async def test_dnf_provides_not_found(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (1, "", "No matches found") + + result = await mcp_client.call_tool( + "dnf_provides", + arguments={"query": "/missing/file", "host": "remote.example.com"}, + ) + result_text = result.content[0].text.casefold() + + assert "no packages provide" in result_text + + async def test_dnf_provides_invalid_query(self, mcp_client, mock_execute_with_fallback): + with pytest.raises(Exception) as exc_info: + await mcp_client.call_tool( + "dnf_provides", + arguments={"query": "bad name", "host": "remote.example.com"}, + ) + + assert "validation" in str(exc_info.value).casefold() or "invalid" in str(exc_info.value).casefold() + mock_execute_with_fallback.assert_not_called() + + async def test_get_dnf_repo_info_success(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, "Repo-id : baseos", "") + + result = await mcp_client.call_tool( + "get_dnf_repo_info", + arguments={"repo_id": "baseos", "host": "remote.example.com"}, + ) + result_text = result.content[0].text + + assert "Repo-id" in result_text + + async def test_get_dnf_repo_info_not_found(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (1, "", "No matching repo to modify: missing") + + result = await mcp_client.call_tool( + "get_dnf_repo_info", + arguments={"repo_id": "missing", "host": "remote.example.com"}, + ) + result_text = result.content[0].text.casefold() + + assert "not found" in result_text + + async def test_get_dnf_repo_info_invalid_repo_id(self, mcp_client, mock_execute_with_fallback): + with pytest.raises(Exception) as exc_info: + await mcp_client.call_tool( + "get_dnf_repo_info", + arguments={"repo_id": "bad repo", "host": "remote.example.com"}, + ) + + assert "validation" in str(exc_info.value).casefold() or "invalid" in str(exc_info.value).casefold() + mock_execute_with_fallback.assert_not_called() + + async def test_get_dnf_group_info_success(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, "Group: Development Tools", "") + + result = await mcp_client.call_tool( + "get_dnf_group_info", + arguments={"group": "Development Tools", "host": "remote.example.com"}, + ) + result_text = result.content[0].text + + assert "Development Tools" in result_text + + async def test_get_dnf_group_info_not_found(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (1, "", "No groups matched: Missing Group") + + result = await mcp_client.call_tool( + "get_dnf_group_info", + arguments={"group": "Missing Group", "host": "remote.example.com"}, + ) + result_text = result.content[0].text.casefold() + + assert "not found" in result_text + + async def test_get_dnf_group_info_invalid_name(self, mcp_client, mock_execute_with_fallback): + with pytest.raises(Exception) as exc_info: + await mcp_client.call_tool( + "get_dnf_group_info", + arguments={"group": "bad/name", "host": "remote.example.com"}, + ) + + assert "validation" in str(exc_info.value).casefold() or "invalid" in str(exc_info.value).casefold() + mock_execute_with_fallback.assert_not_called() + + async def test_list_dnf_modules_success(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, "Name Stream Profiles", "") + + result = await mcp_client.call_tool( + "list_dnf_modules", + arguments={"host": "remote.example.com"}, + ) + result_text = result.content[0].text + + assert "Name Stream Profiles" in result_text + + async def test_list_dnf_modules_truncates_output(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, "line1\nline2\nline3\nline4", "") + + result = await mcp_client.call_tool( + "list_dnf_modules", + arguments={"module": "nodejs", "limit": 2, "offset": 1, "host": "remote.example.com"}, + ) + result_text = result.content[0].text + + assert result_text.startswith("line2\nline3") + assert "output truncated" in result_text + + async def test_list_dnf_modules_filtered_not_found(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, "No matching modules to list", "") + + result = await mcp_client.call_tool( + "list_dnf_modules", + arguments={"module": "missing", "host": "remote.example.com"}, + ) + result_text = result.content[0].text.casefold() + + assert "no modules matched" in result_text + + async def test_list_dnf_modules_invalid_name(self, mcp_client, mock_execute_with_fallback): + with pytest.raises(Exception) as exc_info: + await mcp_client.call_tool( + "list_dnf_modules", + arguments={"module": "bad name", "host": "remote.example.com"}, + ) + + assert "validation" in str(exc_info.value).casefold() or "invalid" in str(exc_info.value).casefold() + mock_execute_with_fallback.assert_not_called() + + async def test_dnf_module_provides_success(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (0, "module provides package", "") + + result = await mcp_client.call_tool( + "dnf_module_provides", + arguments={"package": "python3", "host": "remote.example.com"}, + ) + result_text = result.content[0].text + + assert "module provides package" in result_text + + async def test_dnf_module_provides_not_found(self, mcp_client, mock_execute_with_fallback): + mock_execute_with_fallback.return_value = (1, "", "No matching modules") + + result = await mcp_client.call_tool( + "dnf_module_provides", + arguments={"package": "missing", "host": "remote.example.com"}, + ) + result_text = result.content[0].text.casefold() + + assert "no modules provide" in result_text + + async def test_dnf_module_provides_invalid_package(self, mcp_client, mock_execute_with_fallback): + with pytest.raises(Exception) as exc_info: + await mcp_client.call_tool( + "dnf_module_provides", + arguments={"package": "bad name", "host": "remote.example.com"}, + ) + + assert "validation" in str(exc_info.value).casefold() or "invalid" in str(exc_info.value).casefold() + mock_execute_with_fallback.assert_not_called() + + @pytest.mark.parametrize( + ("tool_name", "arguments"), + [ + ("dnf_provides", {"query": "/usr/bin/python3", "host": "remote.example.com"}), + ("get_dnf_repo_info", {"repo_id": "baseos", "host": "remote.example.com"}), + ("get_dnf_group_info", {"group": "Development Tools", "host": "remote.example.com"}), + ("list_dnf_modules", {"module": "nodejs", "host": "remote.example.com"}), + ("dnf_module_provides", {"package": "python3", "host": "remote.example.com"}), + ], + ) + async def test_dnf_new_tools_error(self, mcp_client, mock_execute_with_fallback, tool_name, arguments): + mock_execute_with_fallback.return_value = (1, "", "dnf error") + + result = await mcp_client.call_tool(tool_name, arguments=arguments) + result_text = result.content[0].text.casefold() + + assert "error running dnf" in result_text + + @pytest.mark.parametrize( + ("tool_name", "arguments"), + [ + ("dnf_provides", {"query": "/usr/bin/python3", "host": "remote.example.com"}), + ("get_dnf_repo_info", {"repo_id": "baseos", "host": "remote.example.com"}), + ("get_dnf_group_info", {"group": "Development Tools", "host": "remote.example.com"}), + ("list_dnf_modules", {"module": "nodejs", "host": "remote.example.com"}), + ("dnf_module_provides", {"package": "python3", "host": "remote.example.com"}), + ], + ) + async def test_dnf_new_tools_empty_output(self, mcp_client, mock_execute_with_fallback, tool_name, arguments): + mock_execute_with_fallback.return_value = (0, " ", "") + + result = await mcp_client.call_tool(tool_name, arguments=arguments) + result_text = result.content[0].text.casefold() + + assert "no output returned" in result_text