Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "azlin"
version = "2.6.22"
version = "2.6.23"
description = "Azure VM fleet management CLI - provision, manage, and monitor development VMs"
requires-python = ">=3.11"
authors = [
Expand Down
109 changes: 92 additions & 17 deletions src/azlin/rust_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,31 @@
3. Exit with error
"""

import copy
import json
import os
import platform
import shutil
import stat
import subprocess
import sys
import tarfile
import tempfile
import urllib.error
import urllib.request
from pathlib import Path
from pathlib import Path, PurePosixPath

GITHUB_REPO = "rysweet/azlin"
MANAGED_BIN_DIR = Path.home() / ".azlin" / "bin"
MANAGED_BIN = MANAGED_BIN_DIR / "azlin"

# Computed once at import time; used to select tar extraction strategy
_PY312_PLUS = sys.version_info >= (3, 12)


class SecurityError(Exception):
"""Raised when a security violation is detected during binary installation."""


def _platform_suffix() -> str | None:
"""Map current platform to GitHub Release asset suffix."""
Expand Down Expand Up @@ -80,11 +92,75 @@ def _find_rust_binary() -> str | None:
return None


def _is_release_binary_member(name: str) -> bool:
"""Return True if the tar member name corresponds to the azlin binary.

Pure predicate — no I/O.
"""
return name == "azlin" or name.endswith("/azlin")


def _validate_release_member(member: tarfile.TarInfo) -> None:
"""Raise SecurityError if a tar member is unsafe to extract.

Checks performed:
- Absolute path (e.g. /etc/cron.d/evil)
- Parent-directory traversal (e.g. ../../usr/bin/evil)
- Non-regular-file on Python < 3.12 (symlinks, device files, hard links)

On Python >= 3.12 filter='data' handles non-regular-file filtering at
extraction time, so only path checks are required here.
"""
path = PurePosixPath(member.name)

if path.is_absolute():
raise SecurityError(f"Absolute path in archive: {member.name!r}")

if ".." in path.parts:
raise SecurityError(f"Path traversal in archive: {member.name!r}")

if not _PY312_PLUS:
# filter='data' is unavailable before 3.12 — check member type manually
if not member.isfile():
raise SecurityError(
f"Non-regular-file in archive: {member.name!r} (type={member.type})"
)


def _extract_release_binary(tmp_path: Path, destination: Path) -> None:
"""Extract only the azlin binary from a release tarball.

Validates every member before extraction and normalises the output
filename to 'azlin' regardless of the member's name in the archive.

Raises:
SecurityError: If any tar member fails validation or the binary is
absent from the archive.
"""
with tarfile.open(tmp_path, "r:gz") as tar:
for member in tar.getmembers():
if not _is_release_binary_member(member.name):
continue

_validate_release_member(member)

# Normalise name: always write to destination/azlin regardless of
# the member's original name inside the archive (defence-in-depth).
safe_member = copy.copy(member)
safe_member.name = "azlin"

if _PY312_PLUS:
tar.extract(safe_member, path=str(destination), filter="data")
else:
tar.extract(safe_member, path=str(destination))

return # Successfully extracted exactly one binary — done

raise SecurityError("azlin binary not found in archive")


def _download_from_release() -> str | None:
"""Download pre-built binary from GitHub Releases."""
import tarfile
import tempfile

suffix = _platform_suffix()
if not suffix:
return None
Expand All @@ -95,10 +171,8 @@ def _download_from_release() -> str | None:
api_url, headers={"Accept": "application/vnd.github+json"}
) # noqa: S310
with urllib.request.urlopen(req, timeout=10) as resp: # noqa: S310 # nosec B310
import json

releases = json.loads(resp.read())
except Exception:
except (urllib.error.URLError, OSError):
return None

# Find the latest Rust release asset for this platform
Expand All @@ -125,28 +199,29 @@ def _download_from_release() -> str | None:
sys.stderr.write(
f"azlin: installing Rust binary v{version} from GitHub Releases...\n"
)
tmp_path = None
try:
with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp:
urllib.request.urlretrieve(download_url, tmp.name) # noqa: S310 # nosec B310
tmp_path = tmp.name
tmp_path = Path(tmp.name)

with tarfile.open(tmp_path, "r:gz") as tar:
for member in tar.getmembers():
if member.name.endswith("/azlin") or member.name == "azlin":
member.name = "azlin"
tar.extract(member, path=str(MANAGED_BIN_DIR))
break
urllib.request.urlretrieve(download_url, str(tmp_path)) # noqa: S310 # nosec B310

os.unlink(tmp_path)
# SecurityError propagates uncaught — installation is aborted loudly
_extract_release_binary(tmp_path, MANAGED_BIN_DIR)

if MANAGED_BIN.exists():
MANAGED_BIN.chmod(
MANAGED_BIN.stat().st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH
)
sys.stderr.write(f"azlin: installed to {MANAGED_BIN}\n")
return str(MANAGED_BIN)
except Exception as e:
except (urllib.error.URLError, OSError) as e:
sys.stderr.write(f"azlin: download failed: {e}\n")
finally:
# Always clean up the temp file — even if SecurityError is raised
if tmp_path is not None and tmp_path.exists():
tmp_path.unlink(missing_ok=True)

return None


Expand Down
Loading