diff --git a/tests/platform/test_lab_runtime.py b/tests/platform/test_lab_runtime.py index 720308b..a744a22 100644 --- a/tests/platform/test_lab_runtime.py +++ b/tests/platform/test_lab_runtime.py @@ -39,6 +39,16 @@ def test_execute_lab_verifier_rejects_shell_only_lab(tmp_path: Path) -> None: assert result.error == "No verify.py for lab 9.8" +def test_execute_lab_verifier_rejects_lab_path_escape(tmp_path: Path) -> None: + labs_root = tmp_path / "labs" + labs_root.mkdir() + + result = execute_lab_verifier("../escape", labs_root=labs_root) + + assert result.passed is False + assert result.error == "Invalid lab path for lab ../escape" + + def test_load_lab_manifest_supports_response_phase_shape() -> None: manifest = load_lab_manifest( Path("labs/tier-7-detection-response/7.5-threat-modeling") diff --git a/weaklink_platform/lab_runtime.py b/weaklink_platform/lab_runtime.py index 8895356..738c8a4 100644 --- a/weaklink_platform/lab_runtime.py +++ b/weaklink_platform/lab_runtime.py @@ -4,6 +4,7 @@ import base64 import json import os +import re import shlex import subprocess from dataclasses import dataclass, field @@ -19,6 +20,7 @@ DEFAULT_REPOS_ROOT = Path("/repos") DEFAULT_WORKSPACE_ROOT = Path("/workspace") DEFAULT_LABS_ROOT = Path("/opt/labs") +LAB_ID_PATTERN = re.compile(r"\d+\.\d+") def _package_root() -> Path: @@ -226,6 +228,12 @@ def read_env_exports(path: Path) -> dict[str, str]: return env +def _validated_lab_id(lab_id: str) -> str: + if not LAB_ID_PATTERN.fullmatch(lab_id): + raise ValueError(f"Invalid lab id: {lab_id}") + return lab_id + + def main_init(callback: InitHook, argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(add_help=False) parser.add_argument("--json", action="store_true") @@ -386,10 +394,22 @@ def execute_lab_verifier( labs_root: Path = DEFAULT_LABS_ROOT, timeout: int = 30, ) -> VerificationResult: - resolved_lab_dir = lab_dir or (labs_root / lab_id) - env = _verifier_env(lab_id, resolved_lab_dir) + try: + resolved_labs_root = labs_root.resolve(strict=False) + safe_lab_id = _validated_lab_id(lab_id) + expected_lab_dir = (resolved_labs_root / safe_lab_id).resolve(strict=False) + if lab_dir is not None: + provided_lab_dir = lab_dir.resolve(strict=False) + if provided_lab_dir != expected_lab_dir: + raise ValueError(f"Lab directory does not match lab id: {lab_id}") + resolved_lab_dir = expected_lab_dir + resolved_lab_dir.relative_to(resolved_labs_root) + python_verifier = (resolved_lab_dir / "verify.py").resolve(strict=False) + python_verifier.relative_to(resolved_lab_dir) + except ValueError: + return VerificationResult(False, (), error=f"Invalid lab path for lab {lab_id}") - python_verifier = resolved_lab_dir / "verify.py" + env = _verifier_env(lab_id, resolved_lab_dir) try: if python_verifier.exists():