diff --git a/pyproject.toml b/pyproject.toml index a5e72650..c075aaa8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "azlin" -version = "2.6.22" +version = "2.6.23" description = "Azure VM fleet management CLI - provision, manage, and monitor development VMs" requires-python = ">=3.11" authors = [ diff --git a/src/azlin/rust_bridge.py b/src/azlin/rust_bridge.py index 83eeff43..5e238ab4 100644 --- a/src/azlin/rust_bridge.py +++ b/src/azlin/rust_bridge.py @@ -15,19 +15,31 @@ 3. Exit with error """ +import copy +import json import os import platform import shutil import stat import subprocess import sys +import tarfile +import tempfile +import urllib.error import urllib.request -from pathlib import Path +from pathlib import Path, PurePosixPath GITHUB_REPO = "rysweet/azlin" MANAGED_BIN_DIR = Path.home() / ".azlin" / "bin" MANAGED_BIN = MANAGED_BIN_DIR / "azlin" +# Computed once at import time; used to select tar extraction strategy +_PY312_PLUS = sys.version_info >= (3, 12) + + +class SecurityError(Exception): + """Raised when a security violation is detected during binary installation.""" + def _platform_suffix() -> str | None: """Map current platform to GitHub Release asset suffix.""" @@ -80,11 +92,75 @@ def _find_rust_binary() -> str | None: return None +def _is_release_binary_member(name: str) -> bool: + """Return True if the tar member name corresponds to the azlin binary. + + Pure predicate — no I/O. + """ + return name == "azlin" or name.endswith("/azlin") + + +def _validate_release_member(member: tarfile.TarInfo) -> None: + """Raise SecurityError if a tar member is unsafe to extract. + + Checks performed: + - Absolute path (e.g. /etc/cron.d/evil) + - Parent-directory traversal (e.g. ../../usr/bin/evil) + - Non-regular-file on Python < 3.12 (symlinks, device files, hard links) + + On Python >= 3.12 filter='data' handles non-regular-file filtering at + extraction time, so only path checks are required here. + """ + path = PurePosixPath(member.name) + + if path.is_absolute(): + raise SecurityError(f"Absolute path in archive: {member.name!r}") + + if ".." in path.parts: + raise SecurityError(f"Path traversal in archive: {member.name!r}") + + if not _PY312_PLUS: + # filter='data' is unavailable before 3.12 — check member type manually + if not member.isfile(): + raise SecurityError( + f"Non-regular-file in archive: {member.name!r} (type={member.type})" + ) + + +def _extract_release_binary(tmp_path: Path, destination: Path) -> None: + """Extract only the azlin binary from a release tarball. + + Validates every member before extraction and normalises the output + filename to 'azlin' regardless of the member's name in the archive. + + Raises: + SecurityError: If any tar member fails validation or the binary is + absent from the archive. + """ + with tarfile.open(tmp_path, "r:gz") as tar: + for member in tar.getmembers(): + if not _is_release_binary_member(member.name): + continue + + _validate_release_member(member) + + # Normalise name: always write to destination/azlin regardless of + # the member's original name inside the archive (defence-in-depth). + safe_member = copy.copy(member) + safe_member.name = "azlin" + + if _PY312_PLUS: + tar.extract(safe_member, path=str(destination), filter="data") + else: + tar.extract(safe_member, path=str(destination)) + + return # Successfully extracted exactly one binary — done + + raise SecurityError("azlin binary not found in archive") + + def _download_from_release() -> str | None: """Download pre-built binary from GitHub Releases.""" - import tarfile - import tempfile - suffix = _platform_suffix() if not suffix: return None @@ -95,10 +171,8 @@ def _download_from_release() -> str | None: api_url, headers={"Accept": "application/vnd.github+json"} ) # noqa: S310 with urllib.request.urlopen(req, timeout=10) as resp: # noqa: S310 # nosec B310 - import json - releases = json.loads(resp.read()) - except Exception: + except (urllib.error.URLError, OSError): return None # Find the latest Rust release asset for this platform @@ -125,19 +199,15 @@ def _download_from_release() -> str | None: sys.stderr.write( f"azlin: installing Rust binary v{version} from GitHub Releases...\n" ) + tmp_path = None try: with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp: - urllib.request.urlretrieve(download_url, tmp.name) # noqa: S310 # nosec B310 - tmp_path = tmp.name + tmp_path = Path(tmp.name) - with tarfile.open(tmp_path, "r:gz") as tar: - for member in tar.getmembers(): - if member.name.endswith("/azlin") or member.name == "azlin": - member.name = "azlin" - tar.extract(member, path=str(MANAGED_BIN_DIR)) - break + urllib.request.urlretrieve(download_url, str(tmp_path)) # noqa: S310 # nosec B310 - os.unlink(tmp_path) + # SecurityError propagates uncaught — installation is aborted loudly + _extract_release_binary(tmp_path, MANAGED_BIN_DIR) if MANAGED_BIN.exists(): MANAGED_BIN.chmod( @@ -145,8 +215,13 @@ def _download_from_release() -> str | None: ) sys.stderr.write(f"azlin: installed to {MANAGED_BIN}\n") return str(MANAGED_BIN) - except Exception as e: + except (urllib.error.URLError, OSError) as e: sys.stderr.write(f"azlin: download failed: {e}\n") + finally: + # Always clean up the temp file — even if SecurityError is raised + if tmp_path is not None and tmp_path.exists(): + tmp_path.unlink(missing_ok=True) + return None diff --git a/tests/unit/test_rust_bridge.py b/tests/unit/test_rust_bridge.py new file mode 100644 index 00000000..397eede8 --- /dev/null +++ b/tests/unit/test_rust_bridge.py @@ -0,0 +1,229 @@ +"""Tests for rust_bridge.py security hardening — Issue #876. + +These tests define the contract for the new security components: + + SecurityError — typed exception for tar extraction failures + _PY312_PLUS — module-level bool constant + _is_release_binary_member — pure predicate: is this the azlin binary? + _validate_release_member — guard: raises SecurityError on unsafe members + _extract_release_binary — orchestrator: safe extraction to dest dir + +All four tests will FAIL until the implementation adds those symbols. +They will PASS once the security hardening in PR #885 is merged. + +Design spec references: + SEC-R-01 path traversal via PurePosixPath.parts + SEC-R-02 _PY312_PLUS uses tuple comparison sys.version_info >= (3, 12) + SEC-R-03 copy.copy(member) before renaming to avoid mutating TarFile internals + SEC-R-04 extractall() must never appear; use member-level extract +""" + +import io +import tarfile +from pathlib import Path +from unittest.mock import patch + +import pytest + +# --------------------------------------------------------------------------- +# These imports will raise ImportError until the implementation is done. +# That is intentional: every test in this class is a FAILING test until +# the security hardening (Issue #876) has been implemented. +# --------------------------------------------------------------------------- +from azlin.rust_bridge import ( # noqa: E402 — must be after conftest path setup + SecurityError, + _PY312_PLUS, + _extract_release_binary, + _validate_release_member, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_tar_gz(members: list[tuple[str, int, bytes]]) -> bytes: + """Build an in-memory .tar.gz with the given (name, type, content) tuples.""" + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tf: + for name, tar_type, content in members: + info = tarfile.TarInfo(name=name) + info.type = tar_type + info.size = len(content) + tf.addfile(info, io.BytesIO(content)) + return buf.getvalue() + + +def _make_tar_gz_file(tmp_path: Path, members: list[tuple[str, int, bytes]]) -> Path: + """Write a .tar.gz to *tmp_path* and return its path.""" + data = _make_tar_gz(members) + p = tmp_path / "release.tar.gz" + p.write_bytes(data) + return p + + +# --------------------------------------------------------------------------- +# Test class +# --------------------------------------------------------------------------- + + +class TestExtractReleaseBinary: + """4 targeted unit tests covering the new security helpers (Issue #876).""" + + # ------------------------------------------------------------------ + # Test 1: filter='data' is passed to tarfile.extract on Python 3.12+ + # ------------------------------------------------------------------ + + def test_filter_data_used_on_python_312_plus(self, tmp_path: Path) -> None: + """On Python 3.12+, _extract_release_binary must call tar.extract with + filter='data' to leverage the security filter introduced in CPython 3.12. + + On Python < 3.12, the filter keyword is not supported and must be omitted. + + Spec ref: SEC-R-02 (_PY312_PLUS), design component _extract_release_binary. + """ + content = b"#!/bin/sh\necho azlin" + tar_path = _make_tar_gz_file( + tmp_path, + [("azlin", tarfile.REGTYPE, content)], + ) + + if _PY312_PLUS: + # Patch tarfile.TarFile.extract so we can inspect kwargs. + # The real extraction is NOT performed; we only verify the call. + with patch("tarfile.TarFile.extract") as mock_extract: + _extract_release_binary(str(tar_path), tmp_path / "dest") + + assert mock_extract.called, ( + "_extract_release_binary must call tar.extract()" + ) + filter_values = [ + c.kwargs.get("filter") for c in mock_extract.call_args_list + ] + assert "data" in filter_values, ( + "On Python 3.12+, extract() must be called with filter='data'. " + f"Actual filter values seen: {filter_values}" + ) + else: + # On < 3.12: extraction must succeed without the filter kwarg. + dest = tmp_path / "dest" + _extract_release_binary(str(tar_path), dest) + assert (dest / "azlin").exists(), ( + "Binary must be extracted to dest directory" + ) + + # ------------------------------------------------------------------ + # Test 2: path traversal entries are rejected + # ------------------------------------------------------------------ + + def test_path_traversal_rejected(self) -> None: + """_validate_release_member must raise SecurityError for any member whose + PurePosixPath.parts contain '..', blocking directory traversal attacks. + + The check must use pathlib.PurePosixPath(name).parts, NOT a naive string + contains, so that names like 'foo..bar' are NOT falsely rejected. + + Spec ref: SEC-R-01. + """ + from pathlib import PurePosixPath + + traversal_names = [ + "../etc/passwd", + "subdir/../../etc/shadow", + "./../../root/.ssh/authorized_keys", + "../azlin", # looks like a binary name but traverses up + ] + safe_name = "foo..bar" # dots-in-name must NOT be rejected + + for name in traversal_names: + # Verify the name actually has '..' in its PurePosixPath parts + assert ".." in PurePosixPath(name).parts, ( + f"Test fixture error: '{name}' should contain '..' parts" + ) + member = tarfile.TarInfo(name=name) + member.type = tarfile.REGTYPE + with pytest.raises(SecurityError): + _validate_release_member(member) + + # Sanity check: a name with '..' in the *text* but not in *parts* + # must NOT raise SecurityError (it's a valid leaf name). + assert ".." not in PurePosixPath(safe_name).parts + safe_member = tarfile.TarInfo(name=safe_name) + safe_member.type = tarfile.REGTYPE + # Should not raise: + _validate_release_member(safe_member) + + # ------------------------------------------------------------------ + # Test 3: symlink members are rejected on Python < 3.12 + # ------------------------------------------------------------------ + + def test_symlink_rejected_on_pre_312(self) -> None: + """On Python < 3.12, _validate_release_member must raise SecurityError + for symlink (SYMTYPE) and hard-link (LNKTYPE) members, because + filter='data' is not available to handle them at the tarfile level. + + On Python 3.12+, filter='data' provides this guarantee at extraction + time, so _validate_release_member may (but need not) reject them here; + the important invariant is that filter='data' is used (Test 1). + + Spec ref: SEC-R-01 (non-regular-file members), SEC-R-02. + """ + symlink_member = tarfile.TarInfo(name="azlin") + symlink_member.type = tarfile.SYMTYPE + symlink_member.linkname = "/usr/bin/malicious" + + hardlink_member = tarfile.TarInfo(name="azlin") + hardlink_member.type = tarfile.LNKTYPE + hardlink_member.linkname = "/etc/passwd" + + if not _PY312_PLUS: + # Pre-3.12: guard must reject symlinks and hard links + with pytest.raises(SecurityError): + _validate_release_member(symlink_member) + with pytest.raises(SecurityError): + _validate_release_member(hardlink_member) + else: + # 3.12+: filter='data' handles this at extraction; the predicate + # may choose to reject early (defensive), but Test 1 already covers + # the filter='data' requirement. We just document the 3.12+ branch. + pytest.skip( + "On Python 3.12+ symlink rejection is handled by filter='data' " + "(see test_filter_data_used_on_python_312_plus)" + ) + + # ------------------------------------------------------------------ + # Test 4: SecurityError raised when no azlin binary is found in archive + # ------------------------------------------------------------------ + + def test_security_error_raised_when_no_binary_in_archive( + self, tmp_path: Path + ) -> None: + """_extract_release_binary must raise SecurityError when the archive + contains no member matching the azlin binary predicate. + + This prevents silent installation failures where a release asset is + downloaded but the binary is absent (e.g., wrong platform tarball, + corrupted or tampered archive). + + Spec ref: design component _extract_release_binary (raises SecurityError + when no binary found after iterating all members). + """ + # Archive with only a README — no 'azlin' binary + tar_path = _make_tar_gz_file( + tmp_path, + [ + ("README.md", tarfile.REGTYPE, b"# azlin\n"), + ("CHANGELOG.md", tarfile.REGTYPE, b"## v1.0\n"), + ], + ) + + dest = tmp_path / "dest" + with pytest.raises(SecurityError): + _extract_release_binary(str(tar_path), dest) + + # Destination directory must NOT be left with partial content + # (the binary should not exist even if extraction started) + assert not (dest / "azlin").exists(), ( + "azlin binary must not exist in dest when SecurityError is raised" + ) diff --git a/tests/unit/test_rust_bridge_security.py b/tests/unit/test_rust_bridge_security.py new file mode 100644 index 00000000..1428717c --- /dev/null +++ b/tests/unit/test_rust_bridge_security.py @@ -0,0 +1,376 @@ +"""Security tests for rust_bridge.py — binary bootstrapper. + +These tests verify that the tar-extraction path cannot be exploited via: + - Path traversal members (../../etc/passwd) + - Absolute-path members (/etc/cron.d/evil) + - Symlink members (on Python < 3.12) + - Device-file members (on Python < 3.12) + - Archives that contain no azlin binary + +Design spec refs: + SEC-R-01 Validate all tar members before extraction (CRITICAL) + SEC-R-02 filter='data' on Python >= 3.12 (HIGH) + SEC-R-03 Normalise extracted member name to "azlin" (HIGH) + SEC-R-04 Skip-all-but-one; never extractall() (HIGH) + SEC-R-06 Temp file cleanup in finally block (LOW) + SEC-R-07 SecurityError NOT caught by download handler (LOW) +""" + +import io +import sys +import tarfile +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from azlin.rust_bridge import ( + SecurityError, + _extract_release_binary, + _is_release_binary_member, + _validate_release_member, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_tarball(members: list[tuple[str, bytes, str]]) -> Path: + """Write an in-memory tar.gz to a temp file and return its path. + + Each element of *members* is (name, content_bytes, type) where type is + one of: 'file', 'symlink', 'hardlink'. + """ + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + for name, content, member_type in members: + info = tarfile.TarInfo(name=name) + if member_type == "file": + info.type = tarfile.REGTYPE + info.size = len(content) + tar.addfile(info, io.BytesIO(content)) + elif member_type == "symlink": + info.type = tarfile.SYMTYPE + info.linkname = "target" + tar.addfile(info) + elif member_type == "hardlink": + info.type = tarfile.LNKTYPE + info.linkname = "target" + tar.addfile(info) + buf.seek(0) + + tmp = tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) + tmp.write(buf.read()) + tmp.flush() + tmp.close() + return Path(tmp.name) + + +def _make_info(name: str, member_type: str = "file") -> tarfile.TarInfo: + """Return a TarInfo with the given name and type.""" + info = tarfile.TarInfo(name=name) + if member_type == "file": + info.type = tarfile.REGTYPE + elif member_type == "symlink": + info.type = tarfile.SYMTYPE + info.linkname = "target" + elif member_type == "hardlink": + info.type = tarfile.LNKTYPE + info.linkname = "target" + elif member_type == "device": + info.type = tarfile.CHRTYPE + return info + + +# --------------------------------------------------------------------------- +# SEC-R-01 / _validate_release_member: path traversal & type checks +# --------------------------------------------------------------------------- + + +class TestValidateReleaseMember: + """Unit tests for _validate_release_member().""" + + def test_valid_regular_file_passes(self): + """A plain regular file with a safe relative name is accepted.""" + info = _make_info("azlin-v1.0/azlin", member_type="file") + # Should not raise + _validate_release_member(info) + + def test_rejects_absolute_path_member(self): + """A member with an absolute path is rejected with SecurityError.""" + info = _make_info("/etc/cron.d/evil", member_type="file") + with pytest.raises(SecurityError, match="Absolute path"): + _validate_release_member(info) + + def test_rejects_path_traversal_member(self): + """A member with '..' in its path is rejected with SecurityError.""" + info = _make_info("../../etc/passwd", member_type="file") + with pytest.raises(SecurityError, match="Path traversal"): + _validate_release_member(info) + + def test_rejects_path_traversal_nested(self): + """Traversal nested inside a subdirectory is also rejected.""" + info = _make_info("legitimate/../../etc/shadow", member_type="file") + with pytest.raises(SecurityError, match="Path traversal"): + _validate_release_member(info) + + def test_allows_dotdot_in_filename_component(self): + """'foo..bar' (dotdot inside a component) is NOT traversal — allowed.""" + info = _make_info("foo..bar/azlin", member_type="file") + # Should not raise — 'foo..bar' is not the same as '..' + _validate_release_member(info) + + @pytest.mark.skipif( + sys.version_info >= (3, 12), + reason="filter='data' handles this on Python 3.12+", + ) + def test_rejects_symlink_member_on_older_python(self): + """On Python < 3.12 a symlink member is rejected with SecurityError.""" + info = _make_info("azlin", member_type="symlink") + with pytest.raises(SecurityError, match="Non-regular-file"): + _validate_release_member(info) + + @pytest.mark.skipif( + sys.version_info >= (3, 12), + reason="filter='data' handles this on Python 3.12+", + ) + def test_rejects_hardlink_member_on_older_python(self): + """On Python < 3.12 a hard-link member is rejected with SecurityError.""" + info = _make_info("azlin", member_type="hardlink") + with pytest.raises(SecurityError, match="Non-regular-file"): + _validate_release_member(info) + + @pytest.mark.skipif( + sys.version_info >= (3, 12), + reason="filter='data' handles this on Python 3.12+", + ) + def test_rejects_device_file_on_older_python(self): + """On Python < 3.12 a device-file member is rejected with SecurityError.""" + info = _make_info("azlin", member_type="device") + with pytest.raises(SecurityError, match="Non-regular-file"): + _validate_release_member(info) + + +# --------------------------------------------------------------------------- +# _is_release_binary_member: predicate +# --------------------------------------------------------------------------- + + +class TestIsReleaseBinaryMember: + def test_exact_name(self): + assert _is_release_binary_member("azlin") is True + + def test_path_ending_in_azlin(self): + assert _is_release_binary_member("dist/azlin") is True + + def test_version_dir_ending_in_azlin(self): + assert _is_release_binary_member("azlin-1.2.3/azlin") is True + + def test_does_not_match_different_binary(self): + assert _is_release_binary_member("azlinx") is False + + def test_does_not_match_partial_suffix(self): + assert _is_release_binary_member("src/azlin.sh") is False + + +# --------------------------------------------------------------------------- +# SEC-R-04 / _extract_release_binary: no binary in archive +# --------------------------------------------------------------------------- + + +class TestExtractReleaseBinary: + def test_raises_when_no_binary_in_archive(self, tmp_path): + """An archive with no azlin binary raises SecurityError.""" + tarball = _make_tarball([("README.txt", b"hello", "file")]) + try: + with pytest.raises(SecurityError, match="azlin binary not found"): + _extract_release_binary(tarball, tmp_path) + finally: + tarball.unlink(missing_ok=True) + + def test_extracts_binary_to_normalised_name(self, tmp_path): + """The binary is always written as 'azlin', regardless of member name.""" + content = b"fake-binary-content" + tarball = _make_tarball([("dist/v1.2.3/azlin", content, "file")]) + try: + _extract_release_binary(tarball, tmp_path) + output = tmp_path / "azlin" + assert output.exists(), "azlin binary should be extracted" + assert output.read_bytes() == content + finally: + tarball.unlink(missing_ok=True) + + def test_rejects_traversal_member_named_like_binary(self, tmp_path): + """A member like '../../azlin' is rejected even though it ends with /azlin.""" + tarball = _make_tarball([("../../azlin", b"evil", "file")]) + try: + with pytest.raises(SecurityError, match="Path traversal"): + _extract_release_binary(tarball, tmp_path) + finally: + tarball.unlink(missing_ok=True) + + @pytest.mark.skipif( + sys.version_info >= (3, 12), + reason="filter='data' handles symlinks on Python 3.12+", + ) + def test_rejects_symlink_named_like_binary(self, tmp_path): + """A symlink member named 'azlin' is rejected on Python < 3.12.""" + tarball = _make_tarball([("azlin", b"", "symlink")]) + try: + with pytest.raises(SecurityError, match="Non-regular-file"): + _extract_release_binary(tarball, tmp_path) + finally: + tarball.unlink(missing_ok=True) + + @pytest.mark.skipif( + sys.version_info < (3, 12), + reason="filter='data' only available on Python 3.12+", + ) + def test_uses_data_filter_on_python_312_plus(self, tmp_path): + """On Python >= 3.12 tar.extract() is called with filter='data'.""" + content = b"fake-binary-content" + tarball = _make_tarball([("azlin", content, "file")]) + try: + with patch("azlin.rust_bridge._PY312_PLUS", True): + with patch("tarfile.TarFile.extract") as mock_extract: + _extract_release_binary(tarball, tmp_path) + _, kwargs = mock_extract.call_args + assert kwargs.get("filter") == "data", ( + "filter='data' must be passed to tar.extract() on Python 3.12+" + ) + finally: + tarball.unlink(missing_ok=True) + + def test_only_binary_member_extracted(self, tmp_path): + """Only the binary member is extracted; other members are skipped.""" + content = b"fake-binary-content" + tarball = _make_tarball( + [ + ("README.txt", b"ignore me", "file"), + ("azlin", content, "file"), + ("LICENSE", b"also ignore", "file"), + ] + ) + try: + _extract_release_binary(tarball, tmp_path) + assert (tmp_path / "azlin").exists() + assert not (tmp_path / "README.txt").exists() + assert not (tmp_path / "LICENSE").exists() + finally: + tarball.unlink(missing_ok=True) + + +# --------------------------------------------------------------------------- +# SEC-R-06 / _download_from_release: temp file cleanup +# --------------------------------------------------------------------------- + + +class TestTempFileCleanup: + """Verify that temp files are always cleaned up, even on extraction failure.""" + + def test_temp_file_cleaned_up_on_security_error(self, tmp_path, monkeypatch): + """If SecurityError is raised during extraction, the temp file is removed.""" + import azlin.rust_bridge as rb + + captured_tmp: list[Path] = [] + + original_mktemp = tempfile.NamedTemporaryFile + + def tracking_mktemp(*args, **kwargs): + f = original_mktemp(*args, **kwargs) + captured_tmp.append(Path(f.name)) + return f + + monkeypatch.setattr(tempfile, "NamedTemporaryFile", tracking_mktemp) + + # Simulate a SecurityError during extraction + def bad_extract(tmp_path, destination): + raise SecurityError("simulated traversal attack") + + monkeypatch.setattr(rb, "_extract_release_binary", bad_extract) + monkeypatch.setattr(rb, "MANAGED_BIN_DIR", tmp_path) + monkeypatch.setattr(rb, "MANAGED_BIN", tmp_path / "azlin") + + # Provide a fake release list and download URL + fake_releases = [ + { + "tag_name": "v0.1.0-rust", + "assets": [ + { + "name": f"azlin-{rb._platform_suffix() or 'linux-x86_64'}.tar.gz", + "browser_download_url": "http://example.com/azlin.tar.gz", + } + ], + } + ] + + with patch("azlin.rust_bridge.urllib.request.urlopen") as mock_urlopen: + mock_resp = MagicMock() + mock_resp.__enter__ = lambda s: s + mock_resp.__exit__ = MagicMock(return_value=False) + mock_resp.read.return_value = ( + __import__("json").dumps(fake_releases).encode() + ) + mock_urlopen.return_value = mock_resp + + with patch("azlin.rust_bridge.urllib.request.urlretrieve"): + # SecurityError should propagate (not be caught) + with pytest.raises(SecurityError): + rb._download_from_release() + + # The temp file must have been deleted despite the SecurityError + for tmp in captured_tmp: + assert not tmp.exists(), ( + f"Temp file {tmp} was not cleaned up after SecurityError" + ) + + +# --------------------------------------------------------------------------- +# SEC-R-07: SecurityError NOT swallowed by the download handler +# --------------------------------------------------------------------------- + + +class TestSecurityErrorPropagation: + """SecurityError must propagate out of _download_from_release() uncaught.""" + + def test_security_error_propagates_from_download_handler( + self, tmp_path, monkeypatch + ): + """_download_from_release() must NOT catch SecurityError.""" + import azlin.rust_bridge as rb + + monkeypatch.setattr(rb, "MANAGED_BIN_DIR", tmp_path) + monkeypatch.setattr(rb, "MANAGED_BIN", tmp_path / "azlin") + + def evil_extract(tmp_path, destination): + raise SecurityError("path traversal detected") + + monkeypatch.setattr(rb, "_extract_release_binary", evil_extract) + + fake_releases = [ + { + "tag_name": "v0.1.0-rust", + "assets": [ + { + "name": f"azlin-{rb._platform_suffix() or 'linux-x86_64'}.tar.gz", + "browser_download_url": "http://example.com/azlin.tar.gz", + } + ], + } + ] + + with patch("azlin.rust_bridge.urllib.request.urlopen") as mock_urlopen: + mock_resp = MagicMock() + mock_resp.__enter__ = lambda s: s + mock_resp.__exit__ = MagicMock(return_value=False) + mock_resp.read.return_value = ( + __import__("json").dumps(fake_releases).encode() + ) + mock_urlopen.return_value = mock_resp + + with patch("azlin.rust_bridge.urllib.request.urlretrieve"): + with pytest.raises(SecurityError, match="path traversal detected"): + rb._download_from_release()