diff --git a/CODEX-FIX-SPEC-2.md b/CODEX-FIX-SPEC-2.md new file mode 100644 index 0000000..24d0938 --- /dev/null +++ b/CODEX-FIX-SPEC-2.md @@ -0,0 +1,48 @@ +# Codex Remediation Task #2: space-hog + +You are tasked with fixing the remaining architectural, security, and quality issues found in the post-Codex audit. Please carefully apply the following fixes: + +## 1. Eliminate `shell=True` in `stats.py` +- **File:** `space_hog/stats.py` +- **Issue:** `run_cleanup` executes `subprocess.run(command, shell=True)`. This bypasses all the safe deletion logic and leaves the app vulnerable to command injection. +- **Fix:** Remove `shell=True`. Change the signature of `run_cleanup` to accept a list of strings if necessary, or use `shlex.split(command)` to parse the string into a list before passing it to `subprocess.run`. Ensure that `shell=False` is used implicitly or explicitly. + +## 2. Fix AppleScript Injection in `memory.py` +- **File:** `space_hog/memory.py` +- **Issue:** `remove_login_item` interpolates `app_name` into an AppleScript block executed via `osascript`. +- **Fix:** Strictly sanitize `app_name` before interpolation using an allowlist regex (e.g., `import re; app_name = re.sub(r'[^a-zA-Z0-9 ._-]', '', app_name)`) to prevent AppleScript injection. + +## 3. Fix TOCTOU Race and State-After-Mutation in `safe_delete.py` +- **File:** `space_hog/safe_delete.py` +- **Issue 1:** `target.is_file()` is called *after* `send2trash.send2trash(str(target))`, which fails because the file is already moved. This causes all deleted files to be incorrectly logged as directories. +- **Issue 2:** The `is_symlink()` check is separated from `send2trash`, creating a TOCTOU race. +- **Fix:** In `move_to_trash()`, evaluate and store `is_file = target.is_file()` *before* calling `send2trash`. Pass `is_file` to `_record_removal` (e.g., `'file' if is_file else 'directory'`). If possible, ensure no symlinks are passed to `send2trash` by checking `target.is_symlink()` immediately before, though the primary fix is the `is_file` order. + +## 4. Fix Symlink Traversal in `utils.py` +- **File:** `space_hog/utils.py` +- **Issue:** `get_dir_size` uses `path.rglob('*')`, which follows directory symlinks on Python < 3.13. +- **Fix:** Rewrite `get_dir_size` to use `os.walk(path, topdown=True, followlinks=False)` or `os.scandir` to safely traverse the directory without following symlinks. Sum the sizes of the files found. + +## 5. Improve Docker Label Sanitization +- **File:** `space_hog/docker.py` +- **Issue:** `_sanitize_label_text` only strips control characters, allowing shell metacharacters like `;`, `|`, `$()`. +- **Fix:** Change the regex to a strict allowlist: `cleaned = re.sub(r'[^a-zA-Z0-9_.-]', '', text).strip()`. + +## 6. Fix `os.walk` Issues in `scanners.py` +- **File:** `space_hog/scanners.py` +- **Issue 1:** Missing `onerror` handler in `os.walk`, causing `PermissionError`s to silently abort the entire scan. +- **Fix 1:** Add an `onerror` callback to `os.walk` (e.g., `onerror=lambda e: None` or log a warning) so traversals continue past inaccessible directories. +- **Issue 2:** The pruning logic (`dirnames[:] = []`) prevents finding nested space hogs (e.g., `node_modules` inside `.venv`). +- **Fix 2:** Instead of clearing `dirnames` completely, selectively remove the matched directory name from `dirnames` or adjust the logic so it can still process independent nested hogs. + +## 7. Fix Silent Exception Swallowing +- **File:** Across codebase (e.g., `safe_delete.py`, `utils.py`, `caches.py`). +- **Issue:** Too many `except Exception: pass` blocks hide errors. +- **Fix:** Where appropriate, change `except Exception:` to `except Exception as e: import logging; logging.warning(f"Error: {e}")` to ensure errors are at least visible. + +## 8. Fix Build Metadata and Dependencies +- **File:** `pyproject.toml` and `space_hog/__init__.py` +- **Issue:** Version mismatch (`0.2.0` vs `0.5.0`). Missing `pytest` and `docker` dependencies. +- **Fix:** Update `pyproject.toml` version to `0.5.0` to match `__init__.py`. Add `docker` to `dependencies`. Add `[project.optional-dependencies]` with `test = ["pytest"]`. + +Apply these fixes comprehensively and carefully. diff --git a/CODEX-FIX-SPEC.md b/CODEX-FIX-SPEC.md new file mode 100644 index 0000000..6ae8983 --- /dev/null +++ b/CODEX-FIX-SPEC.md @@ -0,0 +1,30 @@ +# Codex Remediation Task: space-hog + +You are tasked with remediating critical security and performance vulnerabilities in the `space-hog` repository. Apply the following fixes: + +## 1. Eliminate Command Injection (RCE) +- **File:** `space_hog/safe_delete.py` +- **Issue:** The `move_to_trash` function uses string interpolation inside an AppleScript block (`osascript`) to delete files. This allows arbitrary code execution if a filename contains quotes or AppleScript commands. +- **Fix:** Replace the entire `osascript` block with the `send2trash` Python library. Add `send2trash` to `pyproject.toml` dependencies (e.g., `send2trash>=1.8.0`). Make sure to `import send2trash` and use `send2trash.send2trash(str(target))`. + +## 2. Prevent Symlink Traversal (Arbitrary File Deletion) +- **File:** `space_hog/safe_delete.py` and `space_hog/scanners.py` +- **Issue:** Directory traversals (e.g., `target.iterdir()` or `rglob`) follow symlinks blindly. If a directory contains a symlink to `/usr/local`, the tool might delete system files. +- **Fix:** Add `is_symlink()` checks. For `safe_delete.py`'s `trash_contents`, skip items where `item.is_symlink()` is true and append a warning to the `errors` list. + +## 3. Fix Unbounded Resource Consumption (Performance / DoS) +- **File:** `space_hog/scanners.py` +- **Issue:** `find_space_hogs` iterates over `SPACE_HOG_PATTERNS` and calls `root.rglob(pattern)` for each one (17 separate full-disk traversals). This takes `O(P * N)` time. +- **Fix:** Refactor `find_space_hogs` to use a single `os.walk(root)` pass. During the pass, check if the current directory name matches any key in `SPACE_HOG_PATTERNS`. If it does, calculate its size, add it to the results, and remove it from `dirnames` so `os.walk` doesn't recurse into it. Also, skip any symlinks using `os.path.islink`. + +## 4. Fix Silent Exception Swallowing +- **File:** `space_hog/safe_delete.py` +- **Issue:** The `_record_removal` function has an empty `except Exception: pass` block. +- **Fix:** Change it to `except Exception as e: import logging; logging.warning(f"Failed to record removal: {e}")` to ensure failures are visible without crashing the app. + +## 5. Prevent Docker Label Injection +- **File:** `space_hog/docker.py` +- **Issue:** `analyze_docker_volumes` parses project names from Docker labels and later these are embedded in suggested CLI commands without sanitization. +- **Fix:** Sanitize the extracted `project` variable by stripping control characters (e.g., using a regex like `re.sub(r'[\x00-\x1f\x7f]', '', text)`). Also, update `print_docker_analysis` to use `shlex.quote(proj)` when generating the `docker volume rm` suggestion. + +Please thoroughly apply these changes. Ensure all code remains functional. diff --git a/pyproject.toml b/pyproject.toml index 1a60f17..5665a15 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "space-hog" -version = "0.2.0" +version = "0.5.0" description = "Find and reclaim wasted disk space on macOS" readme = "README.md" requires-python = ">=3.10" @@ -13,6 +13,10 @@ authors = [ {name = "dshanklin-bv"} ] keywords = ["disk", "space", "cleanup", "macos", "cli"] +dependencies = [ + "send2trash>=1.8.0", + "psutil>=5.9.0", +] classifiers = [ "Development Status :: 4 - Beta", "Environment :: Console", @@ -33,3 +37,8 @@ space-hog = "space_hog:main" [project.urls] Homepage = "https://github.com/rhea-impact/space-hog" Repository = "https://github.com/rhea-impact/space-hog" + +[project.optional-dependencies] +test = [ + "pytest", +] diff --git a/space_hog/caches.py b/space_hog/caches.py index 49a01d8..79b6db5 100644 --- a/space_hog/caches.py +++ b/space_hog/caches.py @@ -1,5 +1,6 @@ """Cache analysis for Space Hog.""" +import logging import time from pathlib import Path @@ -18,8 +19,8 @@ def check_caches() -> list[tuple[Path, int, str]]: size = get_dir_size(path) if size > 0: results.append((path, size, description)) - except (PermissionError, OSError): - pass + except (PermissionError, OSError) as e: + logging.warning(f"Failed to inspect cache location {path}: {e}") return sorted(results, key=lambda x: x[1], reverse=True) @@ -50,9 +51,9 @@ def get_downloads_analysis(min_age_days: int = 30) -> tuple[int, list[FileInfo]] if stat.st_mtime < cutoff: old_files.append(FileInfo(entry, stat.st_size)) total_size += stat.st_size - except (PermissionError, OSError): - pass - except (PermissionError, OSError): - pass + except (PermissionError, OSError) as e: + logging.warning(f"Failed to inspect Downloads file {entry}: {e}") + except (PermissionError, OSError) as e: + logging.warning(f"Failed to read Downloads directory {downloads}: {e}") return total_size, sorted(old_files, key=lambda x: x.size, reverse=True) diff --git a/space_hog/cli.py b/space_hog/cli.py index 7e9761d..85c3b5e 100644 --- a/space_hog/cli.py +++ b/space_hog/cli.py @@ -140,7 +140,8 @@ def print_full_report(): def run_tier_cleanup(tier: int = 1, dry_run: bool = False): """Run cleanup for a specific tier.""" from .advisor import collect_cleanup_opportunities, calculate_tier_savings - from .stats import run_cleanup, start_cleanup_session, end_cleanup_session, print_post_cleanup_summary + from .safe_delete import safe_cleanup + from .stats import start_cleanup_session, end_cleanup_session, print_post_cleanup_summary print_header(f"TIER {tier} CLEANUP" + (" (DRY RUN)" if dry_run else "")) @@ -181,10 +182,10 @@ def run_tier_cleanup(tier: int = 1, dry_run: bool = False): for item in tier_info['items']: print(f" Cleaning {item['name']}...", end=' ', flush=True) - result = run_cleanup( + result = safe_cleanup( command=item['command'], description=item['name'], - category=item.get('category_key', 'manual') + category=item.get('category_key', 'manual'), ) if result['success']: print(f"{c.GREEN}OK{c.RESET} ({result['bytes_freed_human']})") diff --git a/space_hog/docker.py b/space_hog/docker.py index 88d75ad..1643313 100644 --- a/space_hog/docker.py +++ b/space_hog/docker.py @@ -1,12 +1,24 @@ """Docker disk analysis for Space Hog.""" import json +import logging +import re +import shlex import subprocess from pathlib import Path from .utils import format_size, Colors +def _sanitize_label_text(text: str | None) -> str | None: + """Allow only safe label characters for output and shell use.""" + if not text: + return None + if re.fullmatch(r'^[a-zA-Z0-9_.-]+$', text): + return text + return None + + def analyze_docker() -> dict: """Analyze Docker disk usage including VM disk bloat.""" result = { @@ -103,8 +115,8 @@ def analyze_docker() -> dict: except json.JSONDecodeError: continue - except (subprocess.CalledProcessError, subprocess.TimeoutExpired): - pass + except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: + logging.warning(f"Failed to inspect Docker disk usage: {e}") # Calculate totals result['total_usage'] = ( @@ -181,13 +193,14 @@ def analyze_docker_volumes() -> list[dict]: if 'com.supabase.cli.project=' in labels: for part in labels.split(','): if 'com.supabase.cli.project=' in part: - project = part.split('=')[1] + project = part.split('=', 1)[1] break elif 'com.docker.compose.project=' in labels: for part in labels.split(','): if 'com.docker.compose.project=' in part: - project = part.split('=')[1] + project = part.split('=', 1)[1] break + project = _sanitize_label_text(project) # Parse size size_str = vol.get('Size', '0B') @@ -204,8 +217,8 @@ def analyze_docker_volumes() -> list[dict]: 'driver': vol.get('Driver', 'local'), }) - except (subprocess.CalledProcessError, subprocess.TimeoutExpired, json.JSONDecodeError): - pass + except (subprocess.CalledProcessError, subprocess.TimeoutExpired, json.JSONDecodeError) as e: + logging.warning(f"Failed to inspect Docker volumes: {e}") return sorted(volumes, key=lambda x: -x['size']) @@ -333,7 +346,10 @@ def print_docker_analysis(): orphan_size = sum(v['size'] for v in orphaned) print(f" {c.YELLOW}Orphaned volumes (no running containers): {format_size(orphan_size)}{c.RESET}") print(f" To remove orphaned volumes for a project:") - print(f" docker volume rm $(docker volume ls -q -f 'label=com.docker.compose.project=PROJECT_NAME')") + orphan_projects = sorted({v.get('project') for v in orphaned if v.get('project')}) + for proj in orphan_projects: + quoted_project = shlex.quote(proj) + print(f" docker volume rm $(docker volume ls -q -f label=com.docker.compose.project={quoted_project})") print() # JSON output for AI diff --git a/space_hog/memory.py b/space_hog/memory.py index 2ef6077..f5ff90c 100644 --- a/space_hog/memory.py +++ b/space_hog/memory.py @@ -3,6 +3,8 @@ Tracks RAM consumers, autostart items, and background processes. """ +import logging +import re import subprocess from pathlib import Path from typing import Optional @@ -65,8 +67,8 @@ def get_login_items() -> list[str]: ) if result.returncode == 0 and result.stdout.strip(): return [item.strip() for item in result.stdout.strip().split(',')] - except (subprocess.TimeoutExpired, Exception): - pass + except (subprocess.TimeoutExpired, Exception) as e: + logging.warning(f"Failed to read login items: {e}") return [] @@ -85,7 +87,8 @@ def get_launch_agents() -> list[dict]: capture_output=True, text=True, timeout=5 ) is_running = result.returncode == 0 - except: + except Exception as e: + logging.warning(f"Failed to inspect launch agent {label}: {e}") is_running = False agents.append({ @@ -114,7 +117,8 @@ def get_launch_daemons() -> list[dict]: capture_output=True, text=True, timeout=5 ) is_running = result.returncode == 0 - except: + except Exception as e: + logging.warning(f"Failed to inspect launch daemon {label}: {e}") is_running = False daemons.append({ @@ -298,6 +302,9 @@ def remove_login_item(app_name: str) -> dict: Returns dict with success status. """ + if not re.fullmatch(r"^[a-zA-Z0-9 ._\-'&]+$", app_name): + return {'success': False, 'error': 'Invalid app name'} + try: result = subprocess.run( ['osascript', '-e', diff --git a/space_hog/safe_delete.py b/space_hog/safe_delete.py index 3cddfdd..76b5288 100644 --- a/space_hog/safe_delete.py +++ b/space_hog/safe_delete.py @@ -3,12 +3,18 @@ Moves files to Trash instead of permanent deletion, allowing recovery. """ +import logging import os +import shlex import shutil -import subprocess +import stat from pathlib import Path from datetime import datetime -from typing import Optional + +try: + import send2trash +except ImportError: # pragma: no cover - dependency declared in pyproject.toml + send2trash = None from .utils import format_size @@ -18,8 +24,63 @@ def _record_removal(item_name: str, item_type: str, size_bytes: int): try: from .preferences import record_decision record_decision(item_type, item_name, 'removed', size_bytes) - except Exception: - pass # Don't fail the deletion if recording fails + except Exception as e: + logging.warning(f"Failed to record removal: {e}") + + +def _estimate_path_size(target: Path, target_stat: os.stat_result) -> int: + """Estimate bytes represented by a target path without following symlinks.""" + if stat.S_ISREG(target_stat.st_mode): + return target_stat.st_size + if not stat.S_ISDIR(target_stat.st_mode): + return 0 + + total_size = 0 + + def _walk_error(error: OSError): + logging.warning(f"Failed to inspect {getattr(error, 'filename', target)}: {error}") + + for dirpath, dirnames, filenames in os.walk(target, topdown=True, followlinks=False, onerror=_walk_error): + dirnames[:] = [ + d for d in dirnames + if not os.path.islink(os.path.join(dirpath, d)) + ] + for name in filenames: + file_path = Path(dirpath) / name + try: + file_stat = file_path.lstat() + if stat.S_ISREG(file_stat.st_mode): + total_size += file_stat.st_size + except (PermissionError, OSError) as e: + logging.warning(f"Failed to inspect file {file_path}: {e}") + + return total_size + + +def _is_allowed_trash_target(target: Path) -> bool: + """Validate target is within approved safe base paths.""" + if target == Path('/'): + return False + + try: + resolved_target = target.resolve(strict=True) + except (PermissionError, OSError): + return False + + safe_bases = ( + Path.home().resolve(), + Path('/Applications'), + Path('/Library/Caches'), + ) + + for base in safe_bases: + try: + resolved_target.relative_to(base.resolve(strict=False)) + return True + except ValueError: + continue + + return False def move_to_trash(path: str, dry_run: bool = False) -> dict: @@ -33,23 +94,50 @@ def move_to_trash(path: str, dry_run: bool = False) -> dict: dict with 'success', 'message', 'bytes_freed' (estimated) """ target = Path(path).expanduser() - - if not target.exists(): + try: + target_stat = target.lstat() + except FileNotFoundError: return { 'success': False, 'message': f'Path does not exist: {path}', 'bytes_freed': 0, + 'bytes_freed_human': format_size(0), + 'dry_run': dry_run, + } + except (PermissionError, OSError) as e: + logging.warning(f"Failed to inspect target {target}: {e}") + return { + 'success': False, + 'message': f'Failed to inspect path: {e}', + 'bytes_freed': 0, + 'bytes_freed_human': format_size(0), + 'dry_run': dry_run, + } + + if stat.S_ISLNK(target_stat.st_mode): + return { + 'success': False, + 'message': f'Refusing to trash symlink: {path}', + 'bytes_freed': 0, + 'bytes_freed_human': format_size(0), + 'dry_run': dry_run, + } + + if not _is_allowed_trash_target(target): + return { + 'success': False, + 'message': f'Refusing to trash path outside safe locations: {path}', + 'bytes_freed': 0, + 'bytes_freed_human': format_size(0), 'dry_run': dry_run, } - # Calculate size before moving try: - if target.is_file(): - size = target.stat().st_size - else: - size = sum(f.stat().st_size for f in target.rglob('*') if f.is_file()) - except (PermissionError, OSError): + size = _estimate_path_size(target, target_stat) + except Exception as e: + logging.warning(f"Failed to calculate size for {target}: {e}") size = 0 + is_file = stat.S_ISREG(target_stat.st_mode) if dry_run: return { @@ -60,48 +148,50 @@ def move_to_trash(path: str, dry_run: bool = False) -> dict: 'dry_run': True, } - # Use macOS Finder to move to Trash (proper Trash behavior with undo support) try: - # AppleScript approach - proper Trash with "Put Back" support - script = f''' - tell application "Finder" - delete POSIX file "{target}" - end tell - ''' - result = subprocess.run( - ['osascript', '-e', script], - capture_output=True, - text=True, - timeout=60 - ) - - if result.returncode == 0: - # Record the decision for learning - _record_removal(target.name, 'file' if target.is_file() else 'directory', size) + try: + if stat.S_ISLNK(target.lstat().st_mode): + return { + 'success': False, + 'message': f'Refusing to trash symlink: {path}', + 'bytes_freed': 0, + 'bytes_freed_human': format_size(0), + 'dry_run': False, + } + except FileNotFoundError: return { - 'success': True, - 'message': f'Moved to Trash: {path}', - 'bytes_freed': size, - 'bytes_freed_human': format_size(size), + 'success': False, + 'message': f'Path does not exist: {path}', + 'bytes_freed': 0, + 'bytes_freed_human': format_size(0), 'dry_run': False, - 'recoverable': True, } - else: - # Fallback: manual move to ~/.Trash - return _fallback_trash(target, size) - except subprocess.TimeoutExpired: - return _fallback_trash(target, size) + if send2trash is None: + return _fallback_trash(target, size, is_file) + + send2trash.send2trash(str(target)) + _record_removal(target.name, 'file' if is_file else 'directory', size) + return { + 'success': True, + 'message': f'Moved to Trash: {path}', + 'bytes_freed': size, + 'bytes_freed_human': format_size(size), + 'dry_run': False, + 'recoverable': True, + } except Exception as e: + logging.warning(f"Failed to move {target} to Trash: {e}") return { 'success': False, 'message': f'Failed to trash: {e}', 'bytes_freed': 0, + 'bytes_freed_human': format_size(0), 'dry_run': False, } -def _fallback_trash(target: Path, size: int) -> dict: +def _fallback_trash(target: Path, size: int, is_file: bool) -> dict: """Fallback: manually move to ~/.Trash with timestamp to avoid conflicts.""" trash_dir = Path.home() / '.Trash' timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') @@ -109,9 +199,28 @@ def _fallback_trash(target: Path, size: int) -> dict: trash_path = trash_dir / trash_name try: + try: + if stat.S_ISLNK(target.lstat().st_mode): + return { + 'success': False, + 'message': f'Refusing to trash symlink: {target}', + 'bytes_freed': 0, + 'bytes_freed_human': format_size(0), + 'dry_run': False, + } + except FileNotFoundError: + return { + 'success': False, + 'message': f'Path does not exist: {target}', + 'bytes_freed': 0, + 'bytes_freed_human': format_size(0), + 'dry_run': False, + } + + trash_dir.mkdir(parents=True, exist_ok=True) shutil.move(str(target), str(trash_path)) # Record the decision for learning - _record_removal(target.name, 'file' if target.is_file() else 'directory', size) + _record_removal(target.name, 'file' if is_file else 'directory', size) return { 'success': True, 'message': f'Moved to Trash: {target.name} (as {trash_name})', @@ -122,10 +231,12 @@ def _fallback_trash(target: Path, size: int) -> dict: 'trash_path': str(trash_path), } except Exception as e: + logging.warning(f"Fallback trash move failed for {target}: {e}") return { 'success': False, 'message': f'Failed to move to Trash: {e}', 'bytes_freed': 0, + 'bytes_freed_human': format_size(0), 'dry_run': False, } @@ -141,6 +252,15 @@ def trash_contents(directory: str, dry_run: bool = False) -> dict: """ target = Path(directory).expanduser() + if target.is_symlink(): + return { + 'success': False, + 'message': f'Refusing to trash contents of symlink root: {directory}', + 'bytes_freed': 0, + 'items_trashed': 0, + 'dry_run': dry_run, + } + if not target.exists(): return { 'success': False, @@ -175,6 +295,9 @@ def trash_contents(directory: str, dry_run: bool = False) -> dict: } for item in items: + if item.is_symlink(): + errors.append(f'Skipped symlink: {item}') + continue result = move_to_trash(str(item), dry_run=dry_run) if result['success']: total_size += result.get('bytes_freed', 0) @@ -230,50 +353,197 @@ def trash_app(app_name: str, dry_run: bool = False) -> dict: try: from .preferences import record_decision record_decision('app', app_name.replace('.app', ''), 'removed', result.get('bytes_freed', 0)) - except Exception: - pass + except Exception as e: + logging.warning(f"Failed to record app removal decision: {e}") return result -def safe_cleanup(command: str, description: str, dry_run: bool = False) -> dict: +def _safe_cleanup_single(command: str, description: str, category: str = 'manual', dry_run: bool = False) -> dict: + """Run safe cleanup for a single command without shell chaining.""" + from .stats import record_cleanup, run_cleanup + + try: + tokens = shlex.split(command) + except ValueError as e: + logging.warning(f"Failed to parse cleanup command '{command}': {e}") + return { + 'success': False, + 'bytes_freed': 0, + 'bytes_freed_human': format_size(0), + 'error': str(e), + 'command': command, + 'recorded': False, + } + + if not tokens: + return { + 'success': False, + 'bytes_freed': 0, + 'bytes_freed_human': format_size(0), + 'error': 'Empty command', + 'command': command, + 'recorded': False, + } + + if tokens[0] != 'rm': + if dry_run: + return { + 'success': True, + 'message': f'[DRY RUN] Would run: {command}', + 'bytes_freed': 0, + 'bytes_freed_human': format_size(0), + 'dry_run': True, + 'recoverable': False, + 'note': 'This command cannot be converted to a Trash operation', + 'error': None, + 'command': command, + 'recorded': False, + } + return run_cleanup(command, description, category) + + targets: list[str] = [] + force_mode = False + parsing_options = True + for token in tokens[1:]: + if parsing_options and token == '--': + parsing_options = False + continue + if parsing_options and token.startswith('-'): + option_flags = token.lstrip('-') + if 'f' in option_flags: + force_mode = True + continue + targets.append(token) + + if not targets: + return { + 'success': False, + 'bytes_freed': 0, + 'bytes_freed_human': format_size(0), + 'error': 'No rm targets provided', + 'command': command, + 'recorded': False, + } + + total_size = 0 + recoverable = True + had_errors = False + error_messages: list[str] = [] + + def _is_missing_path_message(message: str | None) -> bool: + if not message: + return False + return ( + message.startswith('Path does not exist:') + or message.startswith('Directory does not exist:') + ) + + def _expand_target(token: str) -> list[str]: + expanded = str(Path(token).expanduser()) if token.startswith('~') else token + if any(ch in expanded for ch in ['*', '?', '[']): + import glob + return glob.glob(expanded) + return [expanded] + + for token in targets: + expanded_token = str(Path(token).expanduser()) if token.startswith('~') else token + + if expanded_token.endswith('/*') and not any(ch in expanded_token[:-2] for ch in ['*', '?', '[']): + dir_path = expanded_token[:-2] + result = trash_contents(dir_path, dry_run=dry_run) + total_size += result.get('bytes_freed', 0) + if not result.get('success', False): + if force_mode and _is_missing_path_message(result.get('message')): + continue + had_errors = True + if result.get('message'): + error_messages.append(result['message']) + if result.get('errors'): + error_messages.extend(result['errors']) + continue + + expanded_targets = _expand_target(token) + for resolved in expanded_targets: + result = move_to_trash(resolved, dry_run=dry_run) + total_size += result.get('bytes_freed', 0) + if not result.get('success', False): + if force_mode and _is_missing_path_message(result.get('message')): + continue + had_errors = True + if result.get('message'): + error_messages.append(result['message']) + + if any(ch in expanded_token for ch in ['*', '?', '[']) and not expanded_targets: + logging.warning(f"Cleanup glob had no matches: {expanded_token}") + + success = not had_errors + if not dry_run and success and total_size > 0: + record_cleanup(description, total_size, category) + + return { + 'success': success, + 'bytes_freed': total_size, + 'bytes_freed_human': format_size(total_size), + 'error': None if success else '; '.join(error_messages) if error_messages else 'Cleanup failed', + 'command': command, + 'recorded': (not dry_run and success and total_size > 0), + 'dry_run': dry_run, + 'recoverable': recoverable, + } + + +def safe_cleanup(command: str, description: str, category: str = 'manual', dry_run: bool = False) -> dict: """Convert dangerous rm commands to safe Trash operations when possible. Args: command: The cleanup command (may be rm -rf or other) description: Human-readable description + category: Cleanup category for stats tracking dry_run: If True, only show what would be done Returns: dict with cleanup results """ - # Parse common rm -rf patterns and convert to safe operations - import re - - # Pattern: rm -rf ~/path/* or rm -rf /path/* - match = re.match(r'^rm\s+-rf?\s+([~\w/\\\s.-]+)/\*\s*$', command) - if match: - path = match.group(1).strip() - return trash_contents(path, dry_run=dry_run) - - # Pattern: rm -rf ~/path or rm -rf /path (whole directory) - match = re.match(r'^rm\s+-rf?\s+([~\w/\\\s.-]+)\s*$', command) - if match: - path = match.group(1).strip() - return move_to_trash(path, dry_run=dry_run) - - # For non-rm commands (npm cache clean, docker prune, etc.), - # we can't safely intercept - just report what would happen - if dry_run: + sub_commands = [part.strip() for part in command.split('&&') if part.strip()] + if not sub_commands: return { - 'success': True, - 'message': f'[DRY RUN] Would run: {command}', + 'success': False, 'bytes_freed': 0, - 'dry_run': True, + 'bytes_freed_human': format_size(0), + 'error': 'Empty command', + 'command': command, + 'recorded': False, + 'dry_run': dry_run, 'recoverable': False, - 'note': 'This command cannot be converted to a Trash operation', } - # For actual execution of non-rm commands, use the original run_cleanup - from .stats import run_cleanup - return run_cleanup(command, description) + if len(sub_commands) == 1: + return _safe_cleanup_single(sub_commands[0], description, category, dry_run=dry_run) + + total_size = 0 + had_errors = False + error_messages: list[str] = [] + recorded = False + recoverable = True + + for sub_command in sub_commands: + result = _safe_cleanup_single(sub_command, description, category, dry_run=dry_run) + total_size += result.get('bytes_freed', 0) + recorded = recorded or result.get('recorded', False) + recoverable = recoverable and result.get('recoverable', False) + + if not result.get('success', False): + had_errors = True + error_messages.append(f"{sub_command}: {result.get('error') or 'Cleanup failed'}") + + return { + 'success': not had_errors, + 'bytes_freed': total_size, + 'bytes_freed_human': format_size(total_size), + 'error': None if not had_errors else '; '.join(error_messages), + 'command': command, + 'recorded': recorded, + 'dry_run': dry_run, + 'recoverable': recoverable, + } diff --git a/space_hog/scanners.py b/space_hog/scanners.py index abaa49d..eb551dd 100644 --- a/space_hog/scanners.py +++ b/space_hog/scanners.py @@ -1,6 +1,8 @@ """File system scanners for Space Hog.""" import hashlib +import logging +import os from collections import defaultdict from pathlib import Path from typing import Generator @@ -12,36 +14,70 @@ def find_large_files(root: Path, min_size_mb: int = 100) -> Generator[FileInfo, None, None]: """Find files larger than min_size_mb.""" min_size = min_size_mb * 1024 * 1024 + + def _walk_error(error: OSError): + logging.warning(f"Failed to scan path {getattr(error, 'filename', root)}: {error}") + try: - for entry in root.rglob('*'): - if entry.is_file() and not entry.is_symlink(): + for dirpath, dirnames, filenames in os.walk(root, topdown=True, followlinks=False, onerror=_walk_error): + dirnames[:] = [ + d for d in dirnames + if not os.path.islink(os.path.join(dirpath, d)) + ] + for filename in filenames: + entry = Path(dirpath) / filename + if entry.is_symlink(): + continue try: size = entry.stat().st_size if size >= min_size: yield FileInfo(entry, size) - except (PermissionError, OSError): - pass - except (PermissionError, OSError): - pass + except (PermissionError, OSError) as e: + logging.warning(f"Failed to inspect file {entry}: {e}") + except (PermissionError, OSError) as e: + logging.warning(f"Failed to scan for large files in {root}: {e}") def find_space_hogs(root: Path, min_size_mb: int = 50) -> list[tuple[Path, int, str]]: """Find common space-hogging directories.""" results = [] min_size = min_size_mb * 1024 * 1024 + pattern_names = set(SPACE_HOG_PATTERNS.keys()) - for pattern, description in SPACE_HOG_PATTERNS.items(): - try: - for entry in root.rglob(pattern): - if entry.is_dir(): - try: - size = get_dir_size(entry) - if size >= min_size: - results.append((entry, size, description)) - except (PermissionError, OSError): - pass - except (PermissionError, OSError): - pass + def _walk_error(error: OSError): + logging.warning(f"Skipping inaccessible path {getattr(error, 'filename', root)}: {error}") + + try: + # Handle root itself if it matches a known hog pattern. + if root.name in pattern_names and not root.is_symlink(): + try: + size = get_dir_size(root) + if size >= min_size: + results.append((root, size, SPACE_HOG_PATTERNS[root.name])) + except (PermissionError, OSError) as e: + logging.warning(f"Failed to size potential space hog {root}: {e}") + + for dirpath, dirnames, _ in os.walk(root, topdown=True, followlinks=False, onerror=_walk_error): + # Prevent traversal into symlinked directories. + dirnames[:] = [ + d for d in dirnames + if not os.path.islink(os.path.join(dirpath, d)) + ] + + # Prune only matched hog directories so sibling branches are still walked. + for dirname in list(dirnames): + if dirname not in pattern_names: + continue + current = Path(dirpath) / dirname + try: + size = get_dir_size(current) + if size >= min_size: + results.append((current, size, SPACE_HOG_PATTERNS[dirname])) + dirnames.remove(dirname) + except (PermissionError, OSError) as e: + logging.warning(f"Failed to size potential space hog {current}: {e}") + except (PermissionError, OSError) as e: + logging.warning(f"Failed to walk for space hogs in {root}: {e}") return sorted(results, key=lambda x: x[1], reverse=True) @@ -52,17 +88,28 @@ def find_duplicates(root: Path, min_size_mb: int = 10) -> dict[str, list[Path]]: # Group by size first size_groups = defaultdict(list) + + def _walk_error(error: OSError): + logging.warning(f"Failed to scan path {getattr(error, 'filename', root)}: {error}") + try: - for entry in root.rglob('*'): - if entry.is_file() and not entry.is_symlink(): + for dirpath, dirnames, filenames in os.walk(root, topdown=True, followlinks=False, onerror=_walk_error): + dirnames[:] = [ + d for d in dirnames + if not os.path.islink(os.path.join(dirpath, d)) + ] + for filename in filenames: + entry = Path(dirpath) / filename + if entry.is_symlink(): + continue try: size = entry.stat().st_size if size >= min_size: size_groups[size].append(entry) - except (PermissionError, OSError): - pass - except (PermissionError, OSError): - pass + except (PermissionError, OSError) as e: + logging.warning(f"Failed to inspect file {entry}: {e}") + except (PermissionError, OSError) as e: + logging.warning(f"Failed to scan for duplicates in {root}: {e}") # For groups with same size, compute hash duplicates = defaultdict(list) @@ -74,8 +121,8 @@ def find_duplicates(root: Path, min_size_mb: int = 10) -> dict[str, list[Path]]: try: file_hash = hash_file(filepath) duplicates[file_hash].append(filepath) - except (PermissionError, OSError): - pass + except (PermissionError, OSError) as e: + logging.warning(f"Failed to hash file {filepath}: {e}") # Filter to only actual duplicates return {h: files for h, files in duplicates.items() if len(files) > 1} diff --git a/space_hog/stats.py b/space_hog/stats.py index 3a5c793..746ae30 100644 --- a/space_hog/stats.py +++ b/space_hog/stats.py @@ -4,6 +4,7 @@ """ import json +import shlex import shutil import subprocess from datetime import datetime @@ -72,7 +73,7 @@ def record_cleanup(description: str, bytes_freed: int, category: str = 'manual') return cleanup_record -def run_cleanup(command: str, description: str, category: str = 'manual') -> dict: +def run_cleanup(command: str | list[str], description: str, category: str = 'manual') -> dict: """Run a cleanup command and measure actual space freed. This is the CORRECT way to run cleanups - it measures before/after @@ -86,23 +87,36 @@ def run_cleanup(command: str, description: str, category: str = 'manual') -> dic # Measure before free_before = shutil.disk_usage("/").free + # Run the command without a shell to avoid command injection. + command_list = shlex.split(command) if isinstance(command, str) else list(command) + if not command_list: + return { + 'success': False, + 'bytes_freed': 0, + 'bytes_freed_human': '0 B', + 'error': 'Empty command', + 'command': command, + } + + command_display = command if isinstance(command, str) else ' '.join(command_list) + # Run the command try: result = subprocess.run( - command, - shell=True, + command_list, + shell=False, capture_output=True, text=True, timeout=300 # 5 minute timeout ) - success = result.returncode == 0 or 'no matches found' in result.stderr + success = result.returncode == 0 or 'no matches found' in (result.stderr or '') except subprocess.TimeoutExpired: return { 'success': False, 'bytes_freed': 0, 'bytes_freed_human': '0 B', 'error': 'Command timed out', - 'command': command, + 'command': command_display, } except Exception as e: return { @@ -110,7 +124,7 @@ def run_cleanup(command: str, description: str, category: str = 'manual') -> dic 'bytes_freed': 0, 'bytes_freed_human': '0 B', 'error': str(e), - 'command': command, + 'command': command_display, } # Measure after @@ -126,7 +140,7 @@ def run_cleanup(command: str, description: str, category: str = 'manual') -> dic 'bytes_freed': max(0, bytes_freed), 'bytes_freed_human': format_size(max(0, bytes_freed)), 'error': None if success else result.stderr, - 'command': command, + 'command': command_display, 'recorded': bytes_freed > 0, } diff --git a/space_hog/utils.py b/space_hog/utils.py index 3d77563..fc69f70 100644 --- a/space_hog/utils.py +++ b/space_hog/utils.py @@ -1,6 +1,8 @@ """Utility functions for Space Hog.""" from dataclasses import dataclass +import logging +import os from pathlib import Path @@ -27,15 +29,38 @@ def format_size(size_bytes: int) -> str: def get_dir_size(path: Path) -> int: """Calculate total size of a directory.""" total = 0 + + if not path.exists(): + return 0 + + if path.is_file(): + try: + return path.stat().st_size + except (PermissionError, OSError) as e: + logging.warning(f"Failed to stat file {path}: {e}") + return 0 + + def _walk_error(error: OSError): + logging.warning(f"Failed to access {getattr(error, 'filename', path)}: {error}") + try: - for entry in path.rglob('*'): - if entry.is_file() and not entry.is_symlink(): + for root, dirnames, filenames in os.walk(path, topdown=True, followlinks=False, onerror=_walk_error): + # Prune symlinked directories defensively. + dirnames[:] = [ + dirname + for dirname in dirnames + if not (Path(root) / dirname).is_symlink() + ] + + for filename in filenames: + entry = Path(root) / filename try: - total += entry.stat().st_size - except (PermissionError, OSError): - pass - except (PermissionError, OSError): - pass + if not entry.is_symlink(): + total += entry.stat().st_size + except (PermissionError, OSError) as e: + logging.warning(f"Failed to stat file {entry}: {e}") + except (PermissionError, OSError) as e: + logging.warning(f"Failed to walk directory {path}: {e}") return total diff --git a/tests/test_docker.py b/tests/test_docker.py index 0788424..2ed5442 100644 --- a/tests/test_docker.py +++ b/tests/test_docker.py @@ -1,6 +1,9 @@ """Tests for space_hog.docker module.""" -from space_hog.docker import _parse_size +import json +import subprocess + +from space_hog.docker import _parse_size, _sanitize_label_text, analyze_docker_volumes, print_docker_analysis class TestParseSize: @@ -42,3 +45,72 @@ def test_kib_mib_gib(self): def test_plain_number(self): assert _parse_size("12345") == 12345 + + +class TestDockerLabelSafety: + """Tests for Docker label sanitization and command quoting.""" + + def test_sanitize_label_text_uses_strict_allowlist(self): + assert _sanitize_label_text("safe.project-1") == "safe.project-1" + assert _sanitize_label_text("proj\x00name\x1f") is None + assert _sanitize_label_text("my project;$(rm -rf /)|x") is None + assert _sanitize_label_text("\n\t ;()$|") is None + + def test_analyze_docker_volumes_sanitizes_project(self, monkeypatch): + payload = { + "Volumes": [ + { + "Name": "vol1", + "Labels": "com.docker.compose.project=proj\x00evil", + "Size": "1KB", + "Links": "0", + "Driver": "local", + } + ] + } + + def fake_run(*args, **kwargs): + return subprocess.CompletedProcess(args[0], 0, stdout=json.dumps(payload), stderr="") + + monkeypatch.setattr("space_hog.docker.subprocess.run", fake_run) + volumes = analyze_docker_volumes() + + assert len(volumes) == 1 + assert volumes[0]["project"] is None + + def test_print_docker_analysis_quotes_project_name(self, monkeypatch, capsys): + monkeypatch.setattr( + "space_hog.docker.analyze_docker", + lambda: { + "installed": True, + "running": True, + "vm_disk_path": None, + "vm_disk_allocated": 0, + "vm_disk_used": 0, + "vm_disk_bloat": 0, + "images": {"count": 0, "size": 0, "reclaimable": 0}, + "containers": {"count": 0, "size": 0, "reclaimable": 0}, + "volumes": {"count": 1, "size": 1024, "reclaimable": 1024}, + "build_cache": {"size": 0, "reclaimable": 0}, + "total_usage": 1024, + "total_reclaimable": 1024, + }, + ) + monkeypatch.setattr( + "space_hog.docker.analyze_docker_volumes", + lambda: [ + { + "name": "vol1", + "project": "myproject", + "size": 1024, + "size_human": "1.0 KB", + "links": 0, + "in_use": False, + "driver": "local", + } + ], + ) + + print_docker_analysis() + out = capsys.readouterr().out + assert "label=com.docker.compose.project=myproject" in out diff --git a/tests/test_memory.py b/tests/test_memory.py new file mode 100644 index 0000000..c24e0f4 --- /dev/null +++ b/tests/test_memory.py @@ -0,0 +1,48 @@ +"""Tests for space_hog.memory module.""" + +import subprocess + +from space_hog.memory import remove_login_item + + +def test_remove_login_item_rejects_invalid_applescript_input(monkeypatch): + called = {"run": False} + + def fake_run(*args, **kwargs): + called["run"] = True + return subprocess.CompletedProcess(args[0], 0, stdout="", stderr="") + + monkeypatch.setattr("space_hog.memory.subprocess.run", fake_run) + + result = remove_login_item('Bad"; do shell script "rm -rf /" --') + + assert result == {"success": False, "error": "Invalid app name"} + assert called["run"] is False + + +def test_remove_login_item_accepts_valid_name(monkeypatch): + calls = {} + + def fake_run(*args, **kwargs): + calls["args"] = args + return subprocess.CompletedProcess(args[0], 0, stdout="", stderr="") + + monkeypatch.setattr("space_hog.memory.subprocess.run", fake_run) + result = remove_login_item("Safe_App 123") + + assert result["success"] is True + assert calls["args"][0][2].endswith('login item "Safe_App 123"') + + +def test_remove_login_item_accepts_apostrophe_and_ampersand(monkeypatch): + calls = {} + + def fake_run(*args, **kwargs): + calls["args"] = args + return subprocess.CompletedProcess(args[0], 0, stdout="", stderr="") + + monkeypatch.setattr("space_hog.memory.subprocess.run", fake_run) + result = remove_login_item("Bob's App & Co") + + assert result["success"] is True + assert calls["args"][0][2].endswith('login item "Bob\'s App & Co"') diff --git a/tests/test_safe_delete.py b/tests/test_safe_delete.py new file mode 100644 index 0000000..1c9886d --- /dev/null +++ b/tests/test_safe_delete.py @@ -0,0 +1,197 @@ +"""Tests for space_hog.safe_delete module.""" + +import tempfile +from pathlib import Path +from types import SimpleNamespace + +import space_hog.safe_delete as safe_delete +from space_hog.safe_delete import move_to_trash, safe_cleanup, trash_contents + + +class TestTrashContents: + """Tests for trash_contents function.""" + + def test_skips_symlink_items(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + real_dir = root / "real" + real_dir.mkdir() + (real_dir / "file.txt").write_text("x") + + link_dir = root / "link" + link_dir.symlink_to(real_dir, target_is_directory=True) + + result = trash_contents(str(root), dry_run=True) + + assert result["items_trashed"] == 1 + assert result["success"] is False + assert result["errors"] is not None + assert any("Skipped symlink" in err for err in result["errors"]) + + def test_rejects_symlink_root_directory(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + real_dir = root / "real" + real_dir.mkdir() + link_root = root / "root-link" + link_root.symlink_to(real_dir, target_is_directory=True) + + result = trash_contents(str(link_root), dry_run=True) + + assert result["success"] is False + assert "symlink root" in result["message"].lower() + + +class TestMoveToTrash: + """Tests for move_to_trash function.""" + + def test_uses_send2trash(self, monkeypatch): + with tempfile.TemporaryDirectory() as tmpdir: + monkeypatch.setattr(safe_delete.Path, "home", lambda: Path(tmpdir)) + target = Path(tmpdir) / "sample.txt" + target.write_text("data") + called = {} + recorded = {} + + def fake_send2trash(path): + called["path"] = path + + def fake_record(item_name, item_type, size_bytes): + recorded["item_name"] = item_name + recorded["item_type"] = item_type + recorded["size_bytes"] = size_bytes + + monkeypatch.setattr( + safe_delete, + "send2trash", + SimpleNamespace(send2trash=fake_send2trash), + ) + monkeypatch.setattr(safe_delete, "_record_removal", fake_record) + result = move_to_trash(str(target)) + + assert result["success"] is True + assert called["path"] == str(target) + assert recorded["item_name"] == "sample.txt" + assert recorded["item_type"] == "file" + + def test_rejects_symlink(self, monkeypatch): + with tempfile.TemporaryDirectory() as tmpdir: + monkeypatch.setattr(safe_delete.Path, "home", lambda: Path(tmpdir)) + root = Path(tmpdir) + real = root / "real.txt" + real.write_text("data") + symlink = root / "link.txt" + symlink.symlink_to(real) + + called = {"send2trash": False} + + def fake_send2trash(path): + called["send2trash"] = True + + monkeypatch.setattr( + safe_delete, + "send2trash", + SimpleNamespace(send2trash=fake_send2trash), + ) + + result = move_to_trash(str(symlink)) + assert result["success"] is False + assert "symlink" in result["message"].lower() + assert called["send2trash"] is False + + def test_rejects_outside_safe_bases(self): + result = move_to_trash("/etc") + assert result["success"] is False + assert "outside safe locations" in result["message"].lower() + + def test_rejects_root_path(self): + result = move_to_trash("/") + assert result["success"] is False + assert "outside safe locations" in result["message"].lower() + + +class TestSafeCleanup: + """Tests for safe_cleanup dispatcher behavior.""" + + def test_rm_glob_uses_trash_contents_and_records_stats(self, monkeypatch): + calls = {"trash_contents": None, "record_cleanup": None} + + def fake_trash_contents(directory, dry_run=False): + calls["trash_contents"] = (directory, dry_run) + return { + "success": True, + "message": "ok", + "bytes_freed": 1234, + "bytes_freed_human": "1.2 KB", + "items_trashed": 1, + "errors": None, + "dry_run": dry_run, + } + + def fake_record_cleanup(description, bytes_freed, category="manual"): + calls["record_cleanup"] = (description, bytes_freed, category) + + monkeypatch.setattr(safe_delete, "trash_contents", fake_trash_contents) + monkeypatch.setattr("space_hog.stats.record_cleanup", fake_record_cleanup) + + result = safe_cleanup( + command="rm -rf ~/Library/Caches/*", + description="Caches", + category="cache", + ) + + assert result["success"] is True + assert calls["trash_contents"] == (str(Path("~/Library/Caches").expanduser()), False) + assert calls["record_cleanup"] == ("Caches", 1234, "cache") + + def test_non_rm_delegates_to_run_cleanup(self, monkeypatch): + calls = {} + + def fake_run_cleanup(command, description, category="manual"): + calls["args"] = (command, description, category) + return {"success": True, "bytes_freed": 0, "bytes_freed_human": "0.0 B", "error": None, "command": command, "recorded": False} + + monkeypatch.setattr("space_hog.stats.run_cleanup", fake_run_cleanup) + + result = safe_cleanup("docker system prune -a", "Docker", category="docker") + assert result["success"] is True + assert calls["args"] == ("docker system prune -a", "Docker", "docker") + + def test_chained_commands_run_all_parts(self, monkeypatch): + calls = {"run": [], "move": []} + + def fake_run_cleanup(command, description, category="manual"): + calls["run"].append((command, description, category)) + return { + "success": True, + "bytes_freed": 20, + "bytes_freed_human": "20.0 B", + "error": None, + "command": command, + "recorded": False, + "recoverable": False, + } + + def fake_move_to_trash(path, dry_run=False): + calls["move"].append((path, dry_run)) + return { + "success": True, + "message": "ok", + "bytes_freed": 10, + "bytes_freed_human": "10.0 B", + "dry_run": dry_run, + } + + monkeypatch.setattr("space_hog.stats.run_cleanup", fake_run_cleanup) + monkeypatch.setattr(safe_delete, "move_to_trash", fake_move_to_trash) + + result = safe_cleanup( + "npm cache clean --force && rm -rf ~/.npm/_npx/*", + "NPM cache", + category="cache", + ) + + assert result["success"] is True + assert result["bytes_freed"] == 30 + assert calls["run"] == [("npm cache clean --force", "NPM cache", "cache")] + assert calls["move"] == [(str(Path("~/.npm/_npx/*").expanduser()), False)] diff --git a/tests/test_scanners.py b/tests/test_scanners.py index 33997dd..ac6e3bc 100644 --- a/tests/test_scanners.py +++ b/tests/test_scanners.py @@ -70,6 +70,36 @@ def test_finds_git_directory(self): hog_names = [h[2] for h in hogs] assert "Git repositories" in hog_names + def test_skips_symlinked_directories(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + real_node_modules = root / "real_node_modules" + real_node_modules.mkdir() + (real_node_modules / "file.txt").write_text("x" * 1000) + + symlinked = root / "project" / "node_modules" + symlinked.parent.mkdir() + symlinked.symlink_to(real_node_modules, target_is_directory=True) + + hogs = find_space_hogs(root, min_size_mb=0) + hog_paths = [h[0] for h in hogs] + assert symlinked not in hog_paths + + def test_finds_space_hogs_in_independent_branches(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + node_modules = root / "project_a" / "node_modules" + git_dir = root / "project_b" / ".git" + node_modules.mkdir(parents=True) + git_dir.mkdir(parents=True) + (node_modules / "dep").write_text("x" * 5000) + (git_dir / "obj").write_text("x" * 5000) + + hogs = find_space_hogs(root, min_size_mb=0) + hog_descriptions = [h[2] for h in hogs] + assert "Node.js dependencies" in hog_descriptions + assert "Git repositories" in hog_descriptions + class TestHashFile: """Tests for hash_file function.""" diff --git a/tests/test_stats.py b/tests/test_stats.py new file mode 100644 index 0000000..21fd62e --- /dev/null +++ b/tests/test_stats.py @@ -0,0 +1,28 @@ +"""Tests for space_hog.stats module.""" + +import subprocess +from types import SimpleNamespace + +from space_hog.stats import run_cleanup + + +def test_run_cleanup_executes_without_shell(monkeypatch): + calls = {} + + def fake_run(*args, **kwargs): + calls["args"] = args + calls["kwargs"] = kwargs + return subprocess.CompletedProcess(args[0], 0, stdout="", stderr="") + + monkeypatch.setattr("space_hog.stats.subprocess.run", fake_run) + monkeypatch.setattr( + "space_hog.stats.shutil.disk_usage", + lambda _: SimpleNamespace(total=1, used=1, free=1), + ) + + result = run_cleanup("echo hello", "test cleanup") + + assert result["success"] is True + assert calls["args"][0] == ["echo", "hello"] + assert calls["kwargs"]["shell"] is False +