From 4978ef6bbd8e4c64a7f22585468f8fe62a0a2e00 Mon Sep 17 00:00:00 2001 From: Alex Lowe Date: Mon, 6 Apr 2026 10:50:05 -0400 Subject: [PATCH] chore: POC for a lxd provider with a losetup server on the host --- imagecraft/cli.py | 3 + imagecraft/losetup/__init__.py | 145 ++++++ imagecraft/losetup/server/__init__.py | 19 + imagecraft/losetup/server/__main__.py | 19 + imagecraft/losetup/server/_cgroup.py | 50 ++ imagecraft/losetup/server/_lxd.py | 98 ++++ imagecraft/losetup/server/_server.py | 201 ++++++++ imagecraft/plugins/mmdebstrap_plugin.py | 2 +- imagecraft/services/image.py | 70 +-- imagecraft/services/provider.py | 55 +++ snap/hooks/configure | 16 + snap/snapcraft.yaml | 8 + spread.yaml | 2 +- tests/integration/losetup/__init__.py | 0 tests/integration/losetup/server/__init__.py | 0 .../integration/losetup/server/test_cgroup.py | 83 ++++ .../integration/losetup/server/test_server.py | 231 +++++++++ tests/integration/losetup/test_client.py | 79 +++ tests/integration/services/test_provider.py | 329 +++++++++++++ tests/unit/losetup/__init__.py | 0 tests/unit/losetup/server/__init__.py | 0 tests/unit/losetup/server/test_cgroup.py | 80 ++++ tests/unit/losetup/server/test_lxd.py | 201 ++++++++ tests/unit/losetup/server/test_server.py | 373 +++++++++++++++ tests/unit/losetup/test_client.py | 451 ++++++++++++++++++ tests/unit/services/test_image.py | 87 +--- tests/unit/services/test_provider.py | 180 +++++++ tests/unit/test_configure.py | 90 ++++ 28 files changed, 2748 insertions(+), 124 deletions(-) create mode 100644 imagecraft/losetup/__init__.py create mode 100644 imagecraft/losetup/server/__init__.py create mode 100644 imagecraft/losetup/server/__main__.py create mode 100644 imagecraft/losetup/server/_cgroup.py create mode 100644 imagecraft/losetup/server/_lxd.py create mode 100644 imagecraft/losetup/server/_server.py create mode 100644 tests/integration/losetup/__init__.py create mode 100644 tests/integration/losetup/server/__init__.py create mode 100644 tests/integration/losetup/server/test_cgroup.py create mode 100644 tests/integration/losetup/server/test_server.py create mode 100644 tests/integration/losetup/test_client.py create mode 100644 tests/integration/services/test_provider.py create mode 100644 tests/unit/losetup/__init__.py create mode 100644 tests/unit/losetup/server/__init__.py create mode 100644 tests/unit/losetup/server/test_cgroup.py create mode 100644 tests/unit/losetup/server/test_lxd.py create mode 100644 tests/unit/losetup/server/test_server.py create mode 100644 tests/unit/losetup/test_client.py create mode 100644 tests/unit/services/test_provider.py create mode 100644 tests/unit/test_configure.py diff --git a/imagecraft/cli.py b/imagecraft/cli.py index 46c6bd8b..1a25d7a6 100644 --- a/imagecraft/cli.py +++ b/imagecraft/cli.py @@ -48,6 +48,9 @@ def register_services() -> None: ServiceFactory.register( "project", "ImagecraftProjectService", module="imagecraft.services.project" ) + ServiceFactory.register( + "provider", "Provider", module="imagecraft.services.provider" + ) def _create_app() -> Imagecraft: diff --git a/imagecraft/losetup/__init__.py b/imagecraft/losetup/__init__.py new file mode 100644 index 00000000..43b716eb --- /dev/null +++ b/imagecraft/losetup/__init__.py @@ -0,0 +1,145 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""losetup client: attach/detach loop devices via the loopserver REST API when +running inside an LXD container, or via the real losetup binary otherwise. +""" + +import http.client +import json +import pathlib +import socket +import subprocess +import urllib.parse + +from craft_cli import emit + +_LXD_GUEST_SOCK = "/dev/lxd/sock" +_LOOPSERVER_SOCK = "/dev/losetup/sock" + + +def _is_lxd_container() -> bool: + """Return True if we are running inside an LXD container.""" + if not pathlib.Path(_LXD_GUEST_SOCK).exists(): + emit.debug(f"LXD guest socket {_LXD_GUEST_SOCK!r} not found; not in a container") + return False + try: + conn = http.client.HTTPConnection("lxd") + conn.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + conn.sock.connect(_LXD_GUEST_SOCK) + conn.request("GET", "/1.0") + data = json.loads(conn.getresponse().read()) + instance_type = data.get("instance_type") + if instance_type == "container": + emit.debug( + f"Detected LXD container via {_LXD_GUEST_SOCK!r} " + f"(instance_type={instance_type!r})" + ) + return True + emit.debug( + f"LXD guest socket present but instance_type={instance_type!r}; " + "not treating as container" + ) + return False + except Exception as exc: # noqa: BLE001 + emit.debug( + f"Failed to query LXD guest socket {_LXD_GUEST_SOCK!r}: {exc}; " + "assuming not in a container" + ) + return False + + +def _loopserver_request(endpoint: str, path: str) -> list[str]: + """POST to the loopserver REST API and return the metadata list.""" + query = urllib.parse.urlencode({"path": path}) + conn = http.client.HTTPConnection("loopserver") + conn.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + conn.sock.connect(_LOOPSERVER_SOCK) + conn.request("POST", f"{endpoint}?{query}") + response = json.loads(conn.getresponse().read()) + if response.get("error_code"): + raise RuntimeError(response.get("error", "unknown error from loopserver")) + return response["metadata"] + + +def _losetup_attach(path: pathlib.Path) -> list[str]: + """Attach *path* as a loop device using the real losetup, return all device paths.""" + loop_dev = subprocess.run( + ["losetup", "--find", "--show", "--partscan", str(path)], + capture_output=True, + text=True, + check=True, + ).stdout.strip() + + lsblk_data = json.loads( + subprocess.run( + ["lsblk", "--json", "--output", "NAME,TYPE", loop_dev], + capture_output=True, + text=True, + check=True, + ).stdout + ) + devices = [loop_dev] + for child in lsblk_data["blockdevices"][0].get("children", []): + if child.get("type") == "part": + devices.append(f"/dev/{child['name']}") + return devices + + +def _losetup_detach(device: str) -> list[str]: + """Detach *device* using the real losetup, return all device paths that were removed.""" + lsblk_data = json.loads( + subprocess.run( + ["lsblk", "--json", "--output", "NAME,TYPE", device], + capture_output=True, + text=True, + check=True, + ).stdout + ) + devices = [device] + for child in lsblk_data["blockdevices"][0].get("children", []): + if child.get("type") == "part": + devices.append(f"/dev/{child['name']}") + + subprocess.run(["losetup", "--detach", device], check=True) + return devices + + +def attach(path: pathlib.Path) -> list[str]: + """Attach *path* as a loop device and return all device paths (loop + partitions). + + When running inside an LXD container, delegates to the loopserver REST API. + Otherwise calls the real losetup binary directly. + """ + if _is_lxd_container(): + emit.debug(f"Attaching {path} via loopserver at {_LOOPSERVER_SOCK!r}") + return _loopserver_request("/1.0/attach", str(path)) + emit.debug(f"Attaching {path} via losetup directly") + return _losetup_attach(path) + + +def detach(device: str) -> list[str]: + """Detach the loop device *device* and return all device paths that were removed. + + When running inside an LXD container, delegates to the loopserver REST API. + Otherwise calls the real losetup binary directly. + """ + if _is_lxd_container(): + emit.debug(f"Detaching {device} via loopserver at {_LOOPSERVER_SOCK!r}") + return _loopserver_request("/1.0/detach", device) + emit.debug(f"Detaching {device} via losetup directly") + return _losetup_detach(device) + + +__all__ = ["attach", "detach"] diff --git a/imagecraft/losetup/server/__init__.py b/imagecraft/losetup/server/__init__.py new file mode 100644 index 00000000..0fe43623 --- /dev/null +++ b/imagecraft/losetup/server/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""losetup server package.""" + +from imagecraft.losetup.server._server import main + +__all__ = ["main"] diff --git a/imagecraft/losetup/server/__main__.py b/imagecraft/losetup/server/__main__.py new file mode 100644 index 00000000..7638ee1f --- /dev/null +++ b/imagecraft/losetup/server/__main__.py @@ -0,0 +1,19 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Allow running the losetup server with ``python3 -m imagecraft.losetup.server``.""" + +from imagecraft.losetup.server._server import main + +main() diff --git a/imagecraft/losetup/server/_cgroup.py b/imagecraft/losetup/server/_cgroup.py new file mode 100644 index 00000000..dfab105e --- /dev/null +++ b/imagecraft/losetup/server/_cgroup.py @@ -0,0 +1,50 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""cgroup v2 helpers for determining LXD container membership.""" + +import socket +import struct + + +def peer_cgroup(conn: socket.socket) -> str: + """Return the cgroup v2 string for the process on the other end of *conn*.""" + cred = conn.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, struct.calcsize("3i")) + pid, _uid, _gid = struct.unpack("3i", cred) + with open(f"/proc/{pid}/cgroup") as f: + return f.read() + + +def parse_lxd_location(cgroup: str) -> tuple[str, str] | None: + """Return (project, container) from a cgroup v2 string, or None. + + The cgroup v2 entry always has the form ``0::$PATH``. Within that path we + walk the components in reverse to find the innermost one that begins with + ``lxc.payload.``, then split it to recover the LXD project and container. + LXD project names are restricted to [a-zA-Z0-9-] (no underscores), so the + first ``_`` is always the delimiter between project and container name. + """ + for line in cgroup.splitlines(): + if not line.startswith("0::"): + continue + path = line[len("0::"):] + for component in reversed(path.split("/")): + if not component.startswith("lxc.payload."): + continue + project_container = component.split(".")[-1] + parts = project_container.split("_", 1) + if len(parts) == 2: + return parts[0], parts[1] + break # found the v2 line but no lxc.payload component + return None diff --git a/imagecraft/losetup/server/_lxd.py b/imagecraft/losetup/server/_lxd.py new file mode 100644 index 00000000..f0064f7a --- /dev/null +++ b/imagecraft/losetup/server/_lxd.py @@ -0,0 +1,98 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""LXD REST API helpers and path translation.""" + +import http.client +import json +import os +import pathlib +import socket + +LXD_SOCKET = "/var/snap/lxd/common/lxd/unix.socket" + + +def _lxd_connect() -> http.client.HTTPConnection: + conn = http.client.HTTPConnection("lxd") + conn.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + conn.sock.connect(LXD_SOCKET) + return conn + + +def lxd_get(path: str) -> dict: + """Make a GET request to the LXD REST API and return the parsed response.""" + conn = _lxd_connect() + conn.request("GET", path) + return json.loads(conn.getresponse().read()) + + +def lxd_patch(path: str, body: dict) -> dict: + """Make a PATCH request to the LXD REST API and return the parsed response.""" + conn = _lxd_connect() + data = json.dumps(body).encode() + conn.request("PATCH", path, body=data, headers={"Content-Type": "application/json"}) + return json.loads(conn.getresponse().read()) + + +def find_free_loop_slot(devices: dict) -> int: + """Find the smallest N such that no ``imagecraft-loopN*`` device exists.""" + used: set[int] = set() + for name in devices: + if name.startswith("imagecraft-loop"): + num_str = name[len("imagecraft-loop"):].split("p")[0] + if num_str.isdigit(): + used.add(int(num_str)) + n = 0 + while n in used: + n += 1 + return n + + +def convert_container_path_to_host_path( + project: str, container: str, container_path: str +) -> pathlib.Path: + """Translate *container_path* to an absolute path on the host. + + Queries the LXD API for the container's disk devices, finds the one with + the longest ``path`` prefix that matches *container_path* and has a + ``source``, then substitutes that source for the prefix. + + Raises ``ValueError`` if no sourced device covers the given path. + """ + data = lxd_get(f"/1.0/instances/{container}?project={project}") + devices = data["metadata"]["expanded_devices"] + + # Collect disk devices that bind-mount a host source path. + sourced = [ + (dev["path"].rstrip("/"), dev["source"]) + for dev in devices.values() + if dev.get("type") == "disk" and "source" in dev + ] + + # Normalise the container path and find the longest matching mount prefix. + norm = os.path.normpath(container_path) + best_mount = best_source = None + for mount, source in sourced: + if norm == mount or norm.startswith(mount + "/"): + if best_mount is None or len(mount) > len(best_mount): + best_mount = mount + best_source = source + + if best_mount is None: + raise ValueError( + f"{container_path!r} is not under any sourced device in {container!r}" + ) + + relative = norm[len(best_mount):] # may be "" if exact match + return pathlib.Path(best_source + relative) diff --git a/imagecraft/losetup/server/_server.py b/imagecraft/losetup/server/_server.py new file mode 100644 index 00000000..cf52cf53 --- /dev/null +++ b/imagecraft/losetup/server/_server.py @@ -0,0 +1,201 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""losetup server: accept one connection, handle a loopback device request, then exit. + +Socket activation is used (systemd passes a pre-bound socket via LISTEN_FDS). +""" + +import http.server +import json +import os +import pathlib +import socket +import subprocess +import urllib.parse + +from imagecraft.losetup.server._cgroup import parse_lxd_location, peer_cgroup +from imagecraft.losetup.server._lxd import ( + convert_container_path_to_host_path, + find_free_loop_slot, + lxd_get, + lxd_patch, +) + +_SD_LISTEN_FDS_START = 3 + + +def _get_listening_socket() -> socket.socket: + """Return the listening socket passed by systemd socket activation.""" + listen_pid = int(os.environ.get("LISTEN_PID", 0)) + listen_fds = int(os.environ.get("LISTEN_FDS", 0)) + if listen_pid != os.getpid() or listen_fds < 1: + raise RuntimeError( + f"expected socket activation (LISTEN_PID={listen_pid}, " + f"LISTEN_FDS={listen_fds}, pid={os.getpid()})" + ) + return socket.fromfd(_SD_LISTEN_FDS_START, socket.AF_UNIX, socket.SOCK_STREAM) + + +def _handle_attach(project: str, container: str, container_path: str) -> list[str]: + host_path = convert_container_path_to_host_path(project, container, container_path) + + # Attach the image file as a loop device. + loop_dev = subprocess.run( + ["losetup", "--find", "--show", "--partscan", str(host_path)], + capture_output=True, + text=True, + check=True, + ).stdout.strip() + + # Discover partitions via lsblk. + lsblk_data = json.loads( + subprocess.run( + ["lsblk", "--json", "--output", "NAME,TYPE", loop_dev], + capture_output=True, + text=True, + check=True, + ).stdout + ) + loop_name = pathlib.Path(loop_dev).name # e.g. "loop133" + partitions = [ + child["name"] + for child in lsblk_data["blockdevices"][0].get("children", []) + if child.get("type") == "part" + ] + + # Find a free in-container imagecraft-loopN slot. + instance_data = lxd_get(f"/1.0/instances/{container}?project={project}") + current_devices = instance_data["metadata"]["expanded_devices"] + slot = find_free_loop_slot(current_devices) + loop_alias = f"imagecraft-loop{slot}" # e.g. "imagecraft-loop0" + + # Build the new device entries to add to the container. + new_devices: dict[str, dict] = { + loop_alias: { + "type": "unix-block", + "path": f"/dev/{loop_alias}", + "source": loop_dev, + } + } + for part_name in partitions: + part_suffix = part_name[len(loop_name):] # e.g. "p1" + dev_alias = f"{loop_alias}{part_suffix}" + new_devices[dev_alias] = { + "type": "unix-block", + "path": f"/dev/{dev_alias}", + "source": f"/dev/{part_name}", + } + + lxd_patch(f"/1.0/instances/{container}?project={project}", {"devices": new_devices}) + + return [dev["path"] for dev in new_devices.values()] + + +def _handle_detach(project: str, container: str, container_dev_path: str) -> list[str]: + # Find the LXD device name matching this in-container path. + instance_data = lxd_get(f"/1.0/instances/{container}?project={project}") + current_devices = instance_data["metadata"]["expanded_devices"] + + loop_dev = next( + ( + dev["source"] + for dev in current_devices.values() + if dev.get("type") == "unix-block" and dev.get("path") == container_dev_path + ), + None, + ) + if loop_dev is None: + raise ValueError( + f"no unix-block device with path {container_dev_path!r} in {container!r}" + ) + + # Remove all container devices backed by this loop device or its partitions. + devices_to_remove = { + name: None + for name, dev in current_devices.items() + if dev.get("type") == "unix-block" + and ( + dev.get("source") == loop_dev + or dev.get("source", "").startswith(loop_dev + "p") + ) + } + removed_paths = [current_devices[name]["path"] for name in devices_to_remove] + lxd_patch( + f"/1.0/instances/{container}?project={project}", + {"devices": devices_to_remove}, + ) + + subprocess.run(["losetup", "--detach", loop_dev], check=True) + + return removed_paths + + +class _RequestHandler(http.server.BaseHTTPRequestHandler): + """HTTP request handler for the loopserver REST API.""" + + def __init__(self, project: str, container: str, *args, **kwargs) -> None: + self._project = project + self._container = container + super().__init__(*args, **kwargs) + + def do_POST(self) -> None: # noqa: N802 + parsed = urllib.parse.urlparse(self.path) + params = urllib.parse.parse_qs(parsed.query) + path_values = params.get("path", []) + if not path_values: + self._send_error(400, "missing 'path' query parameter") + return + path = path_values[0] + + try: + if parsed.path == "/1.0/attach": + devices = _handle_attach(self._project, self._container, path) + elif parsed.path == "/1.0/detach": + devices = _handle_detach(self._project, self._container, path) + else: + self._send_error(404, f"unknown endpoint {parsed.path!r}") + return + except Exception as exc: # noqa: BLE001 + self._send_error(500, str(exc)) + return + + self._send_json(200, {"status": "Success", "status_code": 200, "metadata": devices}) + + def _send_json(self, code: int, body: dict) -> None: + data = json.dumps(body).encode() + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(data))) + self.end_headers() + self.wfile.write(data) + + def _send_error(self, code: int, message: str) -> None: + self._send_json(code, {"error": message, "error_code": code}) + + def log_message(self, format: str, *args: object) -> None: # noqa: A002 + pass # suppress default stderr logging + + +def main() -> None: + """Entry point for the losetup server daemon.""" + sock = _get_listening_socket() + conn, addr = sock.accept() + with conn: + location = parse_lxd_location(peer_cgroup(conn)) + if location is None or location[0] != "imagecraft": + return + project, container = location + _RequestHandler(project, container, conn, addr, None) + sock.close() diff --git a/imagecraft/plugins/mmdebstrap_plugin.py b/imagecraft/plugins/mmdebstrap_plugin.py index d70159ba..2caf5521 100644 --- a/imagecraft/plugins/mmdebstrap_plugin.py +++ b/imagecraft/plugins/mmdebstrap_plugin.py @@ -88,7 +88,7 @@ def get_build_commands(self) -> list[str]: cmd: list[str] = [ "mmdebstrap", f"--arch={self._part_info.target_arch}", - "--mode=root", + "--mode=fakeroot", f"--variant={options.mmdebstrap_variant}", "--format=dir", ] diff --git a/imagecraft/services/image.py b/imagecraft/services/image.py index 18bf48f7..f0781304 100644 --- a/imagecraft/services/image.py +++ b/imagecraft/services/image.py @@ -16,23 +16,19 @@ import atexit import contextlib -import json import pathlib import shutil -import subprocess import time from collections.abc import Mapping -from typing import Any, cast +from typing import cast from craft_application import AppMetadata, AppService, ServiceFactory from craft_cli import CraftError, emit +from imagecraft import losetup from imagecraft.models import Project from imagecraft.models.volume import PartitionSchema from imagecraft.pack import gptutil -from imagecraft.subprocesses import run - -_LOSETUP_BIN = "losetup" class ImageService(AppService): @@ -94,20 +90,11 @@ def create_images(self) -> Mapping[str, pathlib.Path]: return self._images - def _get_all_loop_devices(self) -> list[dict[str, Any]]: - """Return a list of all loop devices on the system.""" - try: - result = run(_LOSETUP_BIN, "--json") - return cast(list[dict[str, Any]], json.loads(result.stdout)["loopdevices"]) - except (subprocess.CalledProcessError, KeyError, json.JSONDecodeError): - return [] - def attach_images(self) -> Mapping[str, str]: """Attach all created images as loop devices. This method is idempotent. It will reuse existing loop devices if they - are already attached to the correct files, and clean up stale devices - pointing to deleted inodes. + are already attached to the correct files. """ if self._loop_devices: return self._loop_devices @@ -115,47 +102,16 @@ def attach_images(self) -> Mapping[str, str]: if self._images is None: raise ValueError("Images must be created before attaching.") - all_devices = self._get_all_loop_devices() - for name, image_path in self._images.items(): - attached_device: str | None = None - - # 1. Check for existing devices pointing to this file. - for dev in all_devices: - back_file = pathlib.Path(dev["back-file"]) - try: - if image_path.samefile(back_file): - attached_device = dev["name"] - emit.debug( - f"Reusing existing loop device {attached_device} for {image_path}" - ) - break - except FileNotFoundError: - # Stale inode: file deleted and recreated. - if back_file == image_path: - emit.debug( - f"Detaching stale loop device {dev['name']} for {image_path}" - ) - run(_LOSETUP_BIN, "-d", dev["name"]) - - # 2. Attach a fresh device if none was found/reused. - if not attached_device: - try: - attached_device = run( - _LOSETUP_BIN, - "--find", - "--show", - "--partscan", - str(image_path), - ).stdout.strip() - emit.debug(f"Attached {image_path} as {attached_device}") - except subprocess.CalledProcessError as err: - raise CraftError( - f"Failed to attach loop device for {image_path}.", - details=str(err), - resolution="Ensure loop devices are available and you have sufficient permissions (sudo).", - ) from err - + try: + attached_device = losetup.attach(image_path)[0] + emit.debug(f"Attached {image_path} as {attached_device}") + except Exception as err: + raise CraftError( + f"Failed to attach loop device for {image_path}.", + details=str(err), + resolution="Ensure loop devices are available and you have sufficient permissions (sudo).", + ) from err self._loop_devices[name] = attached_device if not self._atexit_registered: @@ -174,7 +130,7 @@ def detach_images(self) -> None: start_time = time.monotonic() while time.monotonic() - start_time < 10: # noqa: PLR2004 (10 seconds) try: - run(_LOSETUP_BIN, "-d", device) + losetup.detach(device) success = True break except Exception: # noqa: BLE001 diff --git a/imagecraft/services/provider.py b/imagecraft/services/provider.py index 083fe891..fcbd0ae1 100644 --- a/imagecraft/services/provider.py +++ b/imagecraft/services/provider.py @@ -14,14 +14,69 @@ """Imagecraft provider service.""" +import contextlib +import os +import pathlib +from collections.abc import Generator +from typing import Any + +import craft_platforms import craft_providers from craft_application.services.provider import ProviderService from craft_cli import CraftError, emit +from craft_providers.lxd import LXDInstance, LXDProvider class Provider(ProviderService): """Imagecraft-specific project service.""" + @contextlib.contextmanager + def instance( + self, + build_info: craft_platforms.BuildInfo, + *, + work_dir: pathlib.Path, + **kwargs: Any, + ) -> Generator[craft_providers.Executor, None, None]: + """Context manager for a provider instance. + + When using the LXD provider, also mounts $SNAP_DATA/losetup into + /dev/losetup inside the instance so the losetup server socket is + accessible. + """ + with super().instance(build_info, work_dir=work_dir, **kwargs) as instance: + if isinstance(self.get_provider(), LXDProvider) and isinstance( + instance, LXDInstance + ): + snap_data = os.environ.get("SNAP_DATA", "") + if snap_data: + losetup_host = pathlib.Path(snap_data) / "losetup" + losetup_host.mkdir(parents=True, exist_ok=True) + self._mount_with_shift(instance, losetup_host) + emit.debug(f"Mounted {losetup_host} -> /dev/losetup in instance") + else: + emit.debug("SNAP_DATA not set; skipping losetup mount") + yield instance + + @staticmethod + def _mount_with_shift(instance: LXDInstance, host_source: pathlib.Path) -> None: + """Mount host_source into /dev/losetup in the instance with UID/GID shifting. + + Uses pylxd directly to set shift=true, which remaps host UIDs/GIDs into + the container's namespace so the directory appears owned by root inside. + """ + device_name = "disk-dev-losetup" + target = "/dev/losetup" + lxd_inst = instance._client.instances.get(instance.instance_name) + if device_name not in lxd_inst.devices: + lxd_inst.devices[device_name] = { + "type": "disk", + "source": str(host_source), + "path": target, + "shift": "true", + } + lxd_inst.save(wait=True) + def get_provider(self, name: str | None = None) -> craft_providers.Provider: """Get the provider to use. This method is a workaround for #253. diff --git a/snap/hooks/configure b/snap/hooks/configure index 69099ddf..5562b42e 100644 --- a/snap/hooks/configure +++ b/snap/hooks/configure @@ -2,6 +2,8 @@ """Hook to configure imagecraft snap.""" +import grp +import os import sys import snaphelpers @@ -9,6 +11,19 @@ from craft_application.errors import CraftValidationError from craft_application.util import SnapConfig +def setup_losetup_dir() -> None: + """Create $SNAP_DATA/losetup and set lxd group ownership if available.""" + losetup_dir = os.path.join(os.environ["SNAP_DATA"], "losetup") + os.makedirs(losetup_dir, exist_ok=True) + + try: + lxd_gid = grp.getgrnam("lxd").gr_gid + except KeyError: + return + + os.chown(losetup_dir, -1, lxd_gid) + + def validate_snap_config() -> None: """Validate snap configuration.""" snap_config = snaphelpers.SnapConfigOptions(keys=["provider"]) @@ -23,4 +38,5 @@ def validate_snap_config() -> None: if __name__ == "__main__": + setup_losetup_dir() validate_snap_config() diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml index ff50f1a3..9ae5e865 100644 --- a/snap/snapcraft.yaml +++ b/snap/snapcraft.yaml @@ -26,6 +26,14 @@ apps: PATH: "$SNAP/libexec/imagecraft:/snap/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" PERL5LIB: $SNAP/usr/share/perl5:$SNAP/usr/share/perl:$PERL5LIB completer: completion.sh + losetup-server: + command: bin/python3 -m imagecraft.losetup.server + daemon: simple + daemon-scope: system + sockets: + losetup-socket: + listen-stream: $SNAP_DATA/losetup/sock + socket-mode: 0660 build-packages: - git diff --git a/spread.yaml b/spread.yaml index 7d8c503c..d9be9d55 100644 --- a/spread.yaml +++ b/spread.yaml @@ -7,7 +7,7 @@ environment: LANGUAGE: "en" # build the snap with lxd PROJECT_PATH: /home/imagecraft - CRAFT_BUILD_ENVIRONMENT: host # Build in destructive mode until we can solve https://github.com/canonical/imagecraft/issues/253 + #CRAFT_BUILD_ENVIRONMENT: host # Build in destructive mode until we can solve https://github.com/canonical/imagecraft/issues/253 SNAPD_TESTING_TOOLS: $PROJECT_PATH/tools/external/tools PATH: /snap/bin:$PATH:$SNAPD_TESTING_TOOLS diff --git a/tests/integration/losetup/__init__.py b/tests/integration/losetup/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/losetup/server/__init__.py b/tests/integration/losetup/server/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/losetup/server/test_cgroup.py b/tests/integration/losetup/server/test_cgroup.py new file mode 100644 index 00000000..d2bab84e --- /dev/null +++ b/tests/integration/losetup/server/test_cgroup.py @@ -0,0 +1,83 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Integration tests for imagecraft/losetup/server/_cgroup.py. + +peer_cgroup() reads SO_PEERCRED from a real socket and opens /proc//cgroup. +No root required (both ends of the socket pair are in the same process). +""" + +import os +import socket + +import pytest + +from imagecraft.losetup.server._cgroup import parse_lxd_location, peer_cgroup + + +class TestPeerCgroupIntegration: + def test_returns_string_for_current_process(self): + """peer_cgroup reads /proc//cgroup via SO_PEERCRED.""" + client, server = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) + try: + result = peer_cgroup(server) + finally: + client.close() + server.close() + + assert isinstance(result, str) + assert len(result) > 0 + + def test_matches_proc_cgroup_of_current_process(self): + """The cgroup string must match /proc//cgroup.""" + pid = os.getpid() + with open(f"/proc/{pid}/cgroup") as f: + expected = f.read() + + client, server = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) + try: + result = peer_cgroup(server) + finally: + client.close() + server.close() + + assert result == expected + + def test_cgroup_contains_0_line(self): + """The cgroup v2 unified hierarchy starts with '0::'.""" + client, server = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) + try: + cgroup = peer_cgroup(server) + finally: + client.close() + server.close() + + lines = cgroup.splitlines() + # At least one line (cgroup v2 pure or hybrid) + assert len(lines) >= 1 + + def test_parse_lxd_location_does_not_crash_on_real_cgroup(self): + """parse_lxd_location should not raise for any real cgroup string.""" + client, server = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) + try: + cgroup = peer_cgroup(server) + finally: + client.close() + server.close() + + # Should return None (we're not in an LXD container in tests) or a tuple + result = parse_lxd_location(cgroup) + assert result is None or ( + isinstance(result, tuple) and len(result) == 2 + ) diff --git a/tests/integration/losetup/server/test_server.py b/tests/integration/losetup/server/test_server.py new file mode 100644 index 00000000..f2a01e5e --- /dev/null +++ b/tests/integration/losetup/server/test_server.py @@ -0,0 +1,231 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Integration tests for imagecraft/losetup/server/_server.py. + +Full round-trip: real socket pair, real HTTP request/response cycle, +with mocked LXD API and losetup calls so no LXD daemon is required. +Tests that need real losetup are marked requires_root. +""" + +import json +import os +import socket +import threading + +import pytest + +from imagecraft.losetup.server._server import ( + _RequestHandler, + _handle_attach, + _handle_detach, + main, +) + + +# ─── helpers ────────────────────────────────────────────────────────────────── + + +def _make_request(method: str, path: str) -> bytes: + return ( + f"{method} {path} HTTP/1.0\r\n" + f"Host: localhost\r\n" + f"\r\n" + ).encode() + + +def _send_and_recv(request_bytes: bytes, project: str, container: str) -> tuple[int, dict]: + """Send *request_bytes* to a _RequestHandler and return (status, body).""" + client, server = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) + try: + client.sendall(request_bytes) + _RequestHandler(project, container, server, ("", 0), None) + finally: + server.close() + + raw = b"" + client.settimeout(3.0) + try: + while True: + chunk = client.recv(4096) + if not chunk: + break + raw += chunk + except socket.timeout: + pass + finally: + client.close() + + header_end = raw.find(b"\r\n\r\n") + header_section = raw[:header_end].decode() + body_bytes = raw[header_end + 4:] + status_code = int(header_section.split("\r\n")[0].split()[1]) + return status_code, json.loads(body_bytes) + + +# ─── full HTTP round-trip (mocked LXD + subprocess) ────────────────────────── + + +class TestServerRoundTrip: + def test_attach_full_cycle(self, mocker): + mocker.patch( + "imagecraft.losetup.server._server._handle_attach", + return_value=["/dev/imagecraft-loop0", "/dev/imagecraft-loop0p1"], + ) + + req = _make_request("POST", "/1.0/attach?path=/root/project/disk.img") + code, body = _send_and_recv(req, "imagecraft", "my-container") + + assert code == 200 + assert body["status_code"] == 200 + assert "/dev/imagecraft-loop0" in body["metadata"] + assert "/dev/imagecraft-loop0p1" in body["metadata"] + + def test_detach_full_cycle(self, mocker): + mocker.patch( + "imagecraft.losetup.server._server._handle_detach", + return_value=["/dev/imagecraft-loop0", "/dev/imagecraft-loop0p1"], + ) + + req = _make_request("POST", "/1.0/detach?path=/dev/imagecraft-loop0") + code, body = _send_and_recv(req, "imagecraft", "my-container") + + assert code == 200 + assert "/dev/imagecraft-loop0" in body["metadata"] + + def test_missing_path_param(self): + req = _make_request("POST", "/1.0/attach") + code, body = _send_and_recv(req, "imagecraft", "my-container") + + assert code == 400 + assert body["error_code"] == 400 + + def test_unknown_endpoint(self): + req = _make_request("POST", "/1.0/badendpoint?path=/foo") + code, body = _send_and_recv(req, "imagecraft", "my-container") + + assert code == 404 + + def test_handler_exception_returns_500(self, mocker): + mocker.patch( + "imagecraft.losetup.server._server._handle_attach", + side_effect=ValueError("boom"), + ) + + req = _make_request("POST", "/1.0/attach?path=/root/disk.img") + code, body = _send_and_recv(req, "imagecraft", "my-container") + + assert code == 500 + assert "boom" in body["error"] + + +# ─── main() – non-imagecraft connection is silently rejected ────────────────── + + +class TestMainRejectsNonImagecraft: + def test_non_imagecraft_project_closes_without_dispatch(self, mocker, tmp_path): + """main() reads cgroup, finds project != 'imagecraft', returns immediately.""" + sock_path = tmp_path / "server.sock" + + # Create a listening socket ourselves + listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + listener.bind(str(sock_path)) + listener.listen(1) + + mocker.patch( + "imagecraft.losetup.server._server._get_listening_socket", + return_value=listener, + ) + # Peer cgroup claims a non-imagecraft project + mocker.patch( + "imagecraft.losetup.server._server.peer_cgroup", + return_value="0::/lxc.payload.snapcraft_some-container\n", + ) + mock_handler = mocker.patch( + "imagecraft.losetup.server._server._RequestHandler" + ) + + client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + client.connect(str(sock_path)) + + main() + + client.close() + listener.close() + + # Handler should NOT have been called + mock_handler.assert_not_called() + + def test_imagecraft_project_dispatches_to_handler(self, mocker, tmp_path): + """main() dispatches to _RequestHandler when project == 'imagecraft'.""" + sock_path = tmp_path / "server.sock" + + listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + listener.bind(str(sock_path)) + listener.listen(1) + + mocker.patch( + "imagecraft.losetup.server._server._get_listening_socket", + return_value=listener, + ) + mocker.patch( + "imagecraft.losetup.server._server.peer_cgroup", + return_value="0::/lxc.payload.imagecraft_my-container\n", + ) + mock_handler = mocker.patch( + "imagecraft.losetup.server._server._RequestHandler" + ) + + client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + client.connect(str(sock_path)) + + main() + + client.close() + listener.close() + + mock_handler.assert_called_once() + call_args = mock_handler.call_args[0] + assert call_args[0] == "imagecraft" + assert call_args[1] == "my-container" + + def test_none_location_closes_without_dispatch(self, mocker, tmp_path): + """main() returns immediately when parse_lxd_location returns None.""" + sock_path = tmp_path / "server.sock" + + listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + listener.bind(str(sock_path)) + listener.listen(1) + + mocker.patch( + "imagecraft.losetup.server._server._get_listening_socket", + return_value=listener, + ) + mocker.patch( + "imagecraft.losetup.server._server.peer_cgroup", + return_value="0::/\n", # not in container + ) + mock_handler = mocker.patch( + "imagecraft.losetup.server._server._RequestHandler" + ) + + client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + client.connect(str(sock_path)) + + main() + + client.close() + listener.close() + + mock_handler.assert_not_called() diff --git a/tests/integration/losetup/test_client.py b/tests/integration/losetup/test_client.py new file mode 100644 index 00000000..5d027b31 --- /dev/null +++ b/tests/integration/losetup/test_client.py @@ -0,0 +1,79 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Integration tests for imagecraft/losetup/__init__.py. + +These tests call the real losetup binary and therefore require root. +""" + +import pathlib + +import pytest + +from imagecraft.losetup import _losetup_attach, _losetup_detach + + +@pytest.mark.requires_root +class TestLosetupAttachDetachIntegration: + def test_attach_returns_loop_device(self, tmp_path): + img = tmp_path / "disk.img" + # Create a 10 MiB sparse file + img.write_bytes(b"\0" * (10 * 1024 * 1024)) + + devices = _losetup_attach(img) + + try: + assert len(devices) >= 1 + loop_dev = devices[0] + assert loop_dev.startswith("/dev/loop") + assert pathlib.Path(loop_dev).exists() + finally: + import subprocess + + subprocess.run(["losetup", "--detach", devices[0]], check=False) + + def test_detach_removes_device(self, tmp_path): + img = tmp_path / "disk.img" + img.write_bytes(b"\0" * (10 * 1024 * 1024)) + + import subprocess + + loop_dev = ( + subprocess.run( + ["losetup", "--find", "--show", str(img)], + capture_output=True, + text=True, + check=True, + ).stdout.strip() + ) + + try: + removed = _losetup_detach(loop_dev) + assert loop_dev in removed + assert not pathlib.Path(loop_dev).exists() + except Exception: + subprocess.run(["losetup", "--detach", loop_dev], check=False) + raise + + def test_attach_and_detach_roundtrip(self, tmp_path): + img = tmp_path / "disk.img" + img.write_bytes(b"\0" * (10 * 1024 * 1024)) + + devices = _losetup_attach(img) + loop_dev = devices[0] + assert pathlib.Path(loop_dev).exists() + + removed = _losetup_detach(loop_dev) + assert loop_dev in removed + assert not pathlib.Path(loop_dev).exists() diff --git a/tests/integration/services/test_provider.py b/tests/integration/services/test_provider.py new file mode 100644 index 00000000..5ca25fb4 --- /dev/null +++ b/tests/integration/services/test_provider.py @@ -0,0 +1,329 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Integration tests for Provider.instance with a live losetup server thread.""" + +import contextlib +import http.client +import json +import os +import pathlib +import socket +import threading +import urllib.parse +from collections.abc import Generator +from unittest.mock import MagicMock, patch + +import pytest +from craft_application import AppMetadata, ServiceFactory +from craft_application.services.provider import ProviderService +from craft_providers.lxd import LXDInstance, LXDProvider + +from imagecraft.losetup.server._server import _RequestHandler + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_server_socket(path: pathlib.Path) -> socket.socket: + """Create and bind a listening Unix socket at *path*.""" + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(str(path)) + os.chmod(path, 0o666) + sock.listen(1) + return sock + + +class _LoopServerThread(threading.Thread): + """Accept connections and dispatch to _RequestHandler in a background thread.""" + + def __init__(self, sock: socket.socket, project: str, container: str) -> None: + super().__init__(daemon=True) + self._sock = sock + self._project = project + self._container = container + self._stop_event = threading.Event() + self._sock.settimeout(0.5) + + def run(self) -> None: + while not self._stop_event.is_set(): + try: + conn, addr = self._sock.accept() + except TimeoutError: + continue + try: + _RequestHandler(self._project, self._container, conn, addr, None) + finally: + conn.close() + + def stop(self) -> None: + self._stop_event.set() + self.join(timeout=3) + + +def _http_post(sock_path: pathlib.Path, endpoint: str, path: str) -> dict: + """Send a POST request to the loop server and return the parsed JSON.""" + query = urllib.parse.urlencode({"path": path}) + conn = http.client.HTTPConnection("loopserver") + conn.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + conn.sock.connect(str(sock_path)) + conn.request("POST", f"{endpoint}?{query}") + return json.loads(conn.getresponse().read()) + + +def _make_provider_service( + mock_provider: object, + mock_services: MagicMock, + mock_app: MagicMock, +) -> "imagecraft.services.provider.Provider": + """Construct a Provider service with the given provider pre-cached.""" + from imagecraft.services.provider import Provider + + svc = Provider(mock_app, mock_services, work_dir=pathlib.Path("/tmp")) + svc._provider = mock_provider + return svc + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def losetup_socket_dir(tmp_path: pathlib.Path) -> pathlib.Path: + d = tmp_path / "snap_data" / "losetup" + d.mkdir(parents=True) + return d + + +@pytest.fixture +def loop_server( + losetup_socket_dir: pathlib.Path, +) -> Generator[tuple[pathlib.Path, _LoopServerThread], None, None]: + """Start a _LoopServerThread on a temp socket. Yields (socket_path, thread).""" + sock_path = losetup_socket_dir / "sock" + srv_sock = _make_server_socket(sock_path) + thread = _LoopServerThread(srv_sock, project="imagecraft", container="test-container") + thread.start() + yield sock_path, thread + thread.stop() + srv_sock.close() + + +@pytest.fixture +def mock_app() -> MagicMock: + app = MagicMock(spec=AppMetadata) + app.name = "imagecraft" + return app + + +@pytest.fixture +def mock_services() -> MagicMock: + svc = MagicMock(spec=ServiceFactory) + project = MagicMock() + project.name = "test-project" + svc.get.return_value.get.return_value = project + return svc + + +# --------------------------------------------------------------------------- +# Tests: Provider.instance losetup mount behaviour +# --------------------------------------------------------------------------- + + +@pytest.fixture +def mock_lxd_inst(): + """A mock pylxd instance (returned by _client.instances.get()).""" + inst = MagicMock() + inst.devices = {} + return inst + + +@pytest.fixture +def mock_lxd_provider_instance(mock_lxd_inst): + """A mock LXDInstance with a wired-up pylxd _client.""" + m = MagicMock(spec=LXDInstance) + m.instance_name = "test-container" + m._client = MagicMock() + m._client.instances.get.return_value = mock_lxd_inst + return m + + +@contextlib.contextmanager +def _fake_super_instance(self, build_info, *, work_dir, **kwargs): + """Stub for ProviderService.instance that yields a fresh MagicMock.""" + yield MagicMock() + + +@pytest.mark.requires_root +def test_provider_instance_mounts_losetup_for_lxd( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, + loop_server: tuple[pathlib.Path, _LoopServerThread], + losetup_socket_dir: pathlib.Path, + mock_app: MagicMock, + mock_services: MagicMock, + mock_lxd_inst: MagicMock, + mock_lxd_provider_instance: MagicMock, +) -> None: + """Provider.instance mounts $SNAP_DATA/losetup when using the LXD provider.""" + snap_data = losetup_socket_dir.parent + monkeypatch.setenv("SNAP_DATA", str(snap_data)) + + provider_svc = _make_provider_service( + MagicMock(spec=LXDProvider), mock_services, mock_app + ) + + @contextlib.contextmanager + def fake_super_with_lxd_instance(self, build_info, *, work_dir, **kwargs): + yield mock_lxd_provider_instance + + with patch.object(ProviderService, "instance", fake_super_with_lxd_instance): + with provider_svc.instance(MagicMock(), work_dir=tmp_path / "work"): + pass + + expected_host = snap_data / "losetup" + assert expected_host.exists() + assert "disk-dev-losetup" in mock_lxd_inst.devices + device = mock_lxd_inst.devices["disk-dev-losetup"] + assert device["type"] == "disk" + assert device["path"] == "/dev/losetup" + assert device["source"] == str(expected_host) + assert device["shift"] == "true" + mock_lxd_inst.save.assert_called_once_with(wait=True) + + +def test_provider_instance_skips_losetup_when_no_snap_data( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, + mock_app: MagicMock, + mock_services: MagicMock, + mock_lxd_inst: MagicMock, + mock_lxd_provider_instance: MagicMock, +) -> None: + """Provider.instance does NOT mount losetup when SNAP_DATA is unset.""" + monkeypatch.delenv("SNAP_DATA", raising=False) + + provider_svc = _make_provider_service( + MagicMock(spec=LXDProvider), mock_services, mock_app + ) + + @contextlib.contextmanager + def fake_super_with_lxd_instance(self, build_info, *, work_dir, **kwargs): + yield mock_lxd_provider_instance + + with patch.object(ProviderService, "instance", fake_super_with_lxd_instance): + with provider_svc.instance(MagicMock(), work_dir=tmp_path / "work"): + pass + + mock_lxd_inst.save.assert_not_called() + + +def test_provider_instance_skips_losetup_for_non_lxd( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, + mock_app: MagicMock, + mock_services: MagicMock, + mock_lxd_inst: MagicMock, + mock_lxd_provider_instance: MagicMock, +) -> None: + """Provider.instance does NOT mount losetup for non-LXD providers.""" + from craft_providers.multipass import MultipassProvider + + monkeypatch.setenv("SNAP_DATA", str(tmp_path / "snap_data")) + + provider_svc = _make_provider_service( + MagicMock(spec=MultipassProvider), mock_services, mock_app + ) + + @contextlib.contextmanager + def fake_super_with_lxd_instance(self, build_info, *, work_dir, **kwargs): + yield mock_lxd_provider_instance + + with patch.object(ProviderService, "instance", fake_super_with_lxd_instance): + with provider_svc.instance(MagicMock(), work_dir=tmp_path / "work"): + pass + + mock_lxd_inst.save.assert_not_called() + + +# --------------------------------------------------------------------------- +# Test: full loopserver attach/detach round-trip via _LoopServerThread +# --------------------------------------------------------------------------- + + +@pytest.mark.requires_root +def test_loopserver_attach_detach_round_trip( + tmp_path: pathlib.Path, + loop_server: tuple[pathlib.Path, _LoopServerThread], +) -> None: + """Full HTTP round-trip: attach an image, verify devices added, then detach.""" + import imagecraft.losetup.server._lxd as _lxd + from imagecraft.pack import gptutil + from unittest.mock import MagicMock + + sock_path, _ = loop_server + + # Create a minimal GPT image to attach. + image_path = tmp_path / "test.img" + gptutil.create_empty_gpt_image( + imagepath=image_path, + sector_size=512, + layout=MagicMock(structure=[], volume_schema=None), + ) + + # Fake LXD device state so we don't need a real daemon. + fake_devices: dict = {} + + def fake_lxd_get(path: str) -> dict: + return {"metadata": {"expanded_devices": dict(fake_devices)}} + + def fake_lxd_patch(path: str, body: dict) -> dict: + for name, dev in body.get("devices", {}).items(): + if dev is None: + fake_devices.pop(name, None) + else: + fake_devices[name] = dev + return {} + + with ( + patch.object(_lxd, "lxd_get", side_effect=fake_lxd_get), + patch.object(_lxd, "lxd_patch", side_effect=fake_lxd_patch), + patch( + "imagecraft.losetup.server._server.convert_container_path_to_host_path", + return_value=image_path, + ), + ): + # Attach + resp = _http_post(sock_path, "/1.0/attach", "/root/test.img") + assert resp["status_code"] == 200, resp + attached = resp["metadata"] + assert attached, "expected at least the loop device in metadata" + loop_alias = attached[0] + assert loop_alias.startswith("/dev/imagecraft-loop") + + # Loop device should appear in the fake device table. + assert any(dev.get("path") == loop_alias for dev in fake_devices.values()) + + # Detach + resp = _http_post(sock_path, "/1.0/detach", loop_alias) + assert resp["status_code"] == 200, resp + assert loop_alias in resp["metadata"] + + # All unix-block devices should be removed. + assert not any( + dev.get("type") == "unix-block" for dev in fake_devices.values() + ) diff --git a/tests/unit/losetup/__init__.py b/tests/unit/losetup/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/losetup/server/__init__.py b/tests/unit/losetup/server/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/losetup/server/test_cgroup.py b/tests/unit/losetup/server/test_cgroup.py new file mode 100644 index 00000000..bd72b254 --- /dev/null +++ b/tests/unit/losetup/server/test_cgroup.py @@ -0,0 +1,80 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Unit tests for imagecraft/losetup/server/_cgroup.py.""" + +import pytest + +from imagecraft.losetup.server._cgroup import parse_lxd_location + + +class TestParseLxdLocation: + @pytest.mark.parametrize( + "cgroup, expected", + [ + # Happy path: project_container + ( + "0::/lxc.payload.snapcraft_snapcraft-imagecraft-amd64-40774607" + "/user.slice/user@0.service/init.scope\n", + ("snapcraft", "snapcraft-imagecraft-amd64-40774607"), + ), + # Root cgroup – not in a container + ("0::/\n", None), + # User slice – no lxc.payload component + ("0::/user.slice/user@1000.service\n", None), + # Empty string + ("", None), + # No 0:: line at all + ("1::/some/path\n2::/other\n", None), + # Exact imagecraft project + ( + "0::/lxc.payload.imagecraft_my-container\n", + ("imagecraft", "my-container"), + ), + ], + ) + def test_parse_various_inputs(self, cgroup, expected): + assert parse_lxd_location(cgroup) == expected + + def test_innermost_wins_with_nested_containers(self): + """Walk in reverse so the innermost lxc.payload wins.""" + cgroup = ( + "0::/lxc.payload.outer_cont1" + "/lxc.payload.inner_cont2" + "/some.scope\n" + ) + assert parse_lxd_location(cgroup) == ("inner", "cont2") + + def test_picks_0_double_colon_line_only(self): + """Lines that don't start with '0::' are ignored.""" + cgroup = "0::/some/path\n1::/other\n" + # "some/path" has no lxc.payload component → None + assert parse_lxd_location(cgroup) is None + + def test_multiple_lines_uses_0_line(self): + """When multiple cgroup lines are present, 0:: is the one parsed.""" + cgroup = ( + "1::/lxc.payload.sneaky_container\n" + "0::/lxc.payload.real_mybox\n" + ) + assert parse_lxd_location(cgroup) == ("real", "mybox") + + def test_no_underscore_separator_returns_none(self): + """If the component after lxc.payload has no underscore, skip it.""" + cgroup = "0::/lxc.payload.nounderscorehere\n" + assert parse_lxd_location(cgroup) is None + + def test_container_name_may_contain_hyphens(self): + cgroup = "0::/lxc.payload.myproject_my-long-container-name-01\n" + assert parse_lxd_location(cgroup) == ("myproject", "my-long-container-name-01") diff --git a/tests/unit/losetup/server/test_lxd.py b/tests/unit/losetup/server/test_lxd.py new file mode 100644 index 00000000..64d4b561 --- /dev/null +++ b/tests/unit/losetup/server/test_lxd.py @@ -0,0 +1,201 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Unit tests for imagecraft/losetup/server/_lxd.py.""" + +import pathlib + +import pytest + +from imagecraft.losetup.server._lxd import ( + convert_container_path_to_host_path, + find_free_loop_slot, +) + + +# ─── find_free_loop_slot ────────────────────────────────────────────────────── + + +class TestFindFreeLoopSlot: + @pytest.mark.parametrize( + "devices, expected", + [ + # Empty → slot 0 + ({}, 0), + # Slot 0 taken (including a partition device) → slot 1 + ( + { + "imagecraft-loop0": {"type": "unix-block"}, + "imagecraft-loop0p1": {"type": "unix-block"}, + }, + 1, + ), + # Gap at slot 1 → slot 1 + ( + { + "imagecraft-loop0": {"type": "unix-block"}, + "imagecraft-loop2": {"type": "unix-block"}, + }, + 1, + ), + # Non-imagecraft devices are ignored + ({"other-device": {"type": "disk"}}, 0), + # Mixed: imagecraft slot 0, other devices → still slot 1 + ( + { + "imagecraft-loop0": {"type": "unix-block"}, + "eth0": {"type": "nic"}, + }, + 1, + ), + # Two consecutive slots taken → slot 2 + ( + { + "imagecraft-loop0": {}, + "imagecraft-loop1": {}, + }, + 2, + ), + ], + ) + def test_find_free_slot(self, devices, expected): + assert find_free_loop_slot(devices) == expected + + +# ─── convert_container_path_to_host_path ────────────────────────────────────── + + +class TestConvertContainerPathToHostPath: + def _make_devices(self, *entries): + """Build an expanded_devices dict from (name, path, source) tuples.""" + return { + name: {"type": "disk", "path": path, "source": source} + for name, path, source in entries + } + + def _mock_lxd_get(self, mocker, devices: dict): + mocker.patch( + "imagecraft.losetup.server._lxd.lxd_get", + return_value={"metadata": {"expanded_devices": devices}}, + ) + + def test_simple_prefix_match(self, mocker): + devices = self._make_devices( + ("disk-root", "/root/project", "/home/user/project") + ) + self._mock_lxd_get(mocker, devices) + + result = convert_container_path_to_host_path( + "myproject", "mycontainer", "/root/project/disk.img" + ) + + assert result == pathlib.Path("/home/user/project/disk.img") + + def test_exact_path_match(self, mocker): + devices = self._make_devices( + ("disk-root", "/root/project", "/home/user/project") + ) + self._mock_lxd_get(mocker, devices) + + result = convert_container_path_to_host_path( + "myproject", "mycontainer", "/root/project" + ) + + assert result == pathlib.Path("/home/user/project") + + def test_longest_prefix_wins(self, mocker): + """When two mounts overlap, the longer (more specific) one is chosen.""" + devices = { + "disk-root": {"type": "disk", "path": "/root", "source": "/host/root"}, + "disk-project": { + "type": "disk", + "path": "/root/project", + "source": "/home/user/project", + }, + } + self._mock_lxd_get(mocker, devices) + + result = convert_container_path_to_host_path( + "myproject", "mycontainer", "/root/project/foo" + ) + + assert result == pathlib.Path("/home/user/project/foo") + + def test_raises_when_no_matching_device(self, mocker): + devices = self._make_devices( + ("disk-other", "/other/path", "/host/other") + ) + self._mock_lxd_get(mocker, devices) + + with pytest.raises(ValueError, match="not under any sourced device"): + convert_container_path_to_host_path( + "myproject", "mycontainer", "/root/project/disk.img" + ) + + def test_non_disk_device_is_ignored(self, mocker): + devices = { + "eth0": {"type": "nic", "path": "/root/project", "source": "/host/proj"}, + "disk-root": {"type": "disk", "path": "/root", "source": "/host/root"}, + } + self._mock_lxd_get(mocker, devices) + + result = convert_container_path_to_host_path( + "myproject", "mycontainer", "/root/project/file" + ) + + # Uses /root mount, not the nic device + assert result == pathlib.Path("/host/root/project/file") + + def test_disk_without_source_is_ignored(self, mocker): + devices = { + "disk-nosource": {"type": "disk", "path": "/root/project"}, + "disk-root": {"type": "disk", "path": "/root", "source": "/host/root"}, + } + self._mock_lxd_get(mocker, devices) + + result = convert_container_path_to_host_path( + "myproject", "mycontainer", "/root/project/file" + ) + + # disk-nosource has no "source" so it's ignored; falls back to /root + assert result == pathlib.Path("/host/root/project/file") + + def test_lxd_get_called_with_correct_path(self, mocker): + mock_get = mocker.patch( + "imagecraft.losetup.server._lxd.lxd_get", + return_value={ + "metadata": { + "expanded_devices": { + "d": {"type": "disk", "path": "/", "source": "/host"} + } + } + }, + ) + + convert_container_path_to_host_path("proj", "cont", "/file") + + mock_get.assert_called_once_with("/1.0/instances/cont?project=proj") + + def test_normpath_applied_to_container_path(self, mocker): + """Trailing slashes and dots are normalised before matching.""" + devices = self._make_devices( + ("disk-root", "/root/project", "/host/project") + ) + self._mock_lxd_get(mocker, devices) + + result = convert_container_path_to_host_path( + "p", "c", "/root/project/" + ) + + assert result == pathlib.Path("/host/project") diff --git a/tests/unit/losetup/server/test_server.py b/tests/unit/losetup/server/test_server.py new file mode 100644 index 00000000..0da3e515 --- /dev/null +++ b/tests/unit/losetup/server/test_server.py @@ -0,0 +1,373 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Unit tests for imagecraft/losetup/server/_server.py.""" + +import json +import pathlib +import socket +from unittest.mock import MagicMock, call + +import pytest + +from imagecraft.losetup.server._server import ( + _RequestHandler, + _handle_attach, + _handle_detach, +) + + +# ─── helpers ────────────────────────────────────────────────────────────────── + + +def _make_http_request( + method: str, + path: str, + version: str = "HTTP/1.0", + extra_headers: str = "", +) -> bytes: + return ( + f"{method} {path} {version}\r\n" + f"Host: localhost\r\n" + f"{extra_headers}" + f"\r\n" + ).encode() + + +def _run_handler( + project: str, container: str, request_bytes: bytes +) -> tuple[int, dict]: + """ + Feed *request_bytes* into a _RequestHandler via a real socketpair. + Returns (status_code, parsed_json_body). + """ + client, server = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) + try: + client.sendall(request_bytes) + _RequestHandler(project, container, server, ("127.0.0.1", 0), None) + finally: + server.close() + + raw = b"" + client.settimeout(2.0) + try: + while True: + chunk = client.recv(4096) + if not chunk: + break + raw += chunk + except socket.timeout: + pass + finally: + client.close() + + # Parse HTTP response: status line + headers + blank line + body + header_end = raw.find(b"\r\n\r\n") + header_section = raw[:header_end].decode() + body = raw[header_end + 4 :] + status_line = header_section.split("\r\n")[0] + status_code = int(status_line.split()[1]) + return status_code, json.loads(body) + + +# ─── _handle_attach ─────────────────────────────────────────────────────────── + + +class TestHandleAttach: + def _setup( + self, + mocker, + loop_dev="/dev/loop133", + partitions=("loop133p1", "loop133p2"), + existing_devices=None, + ): + mocker.patch( + "imagecraft.losetup.server._server.convert_container_path_to_host_path", + return_value=pathlib.Path("/host/disk.img"), + ) + + losetup_result = MagicMock() + losetup_result.stdout = loop_dev + "\n" + part_children = [{"name": p, "type": "part"} for p in partitions] + loop_name = pathlib.Path(loop_dev).name + lsblk_result = MagicMock() + lsblk_result.stdout = json.dumps( + { + "blockdevices": [ + {"name": loop_name, "type": "loop", "children": part_children} + ] + } + ) + mock_run = mocker.patch( + "imagecraft.losetup.server._server.subprocess.run", + side_effect=[losetup_result, lsblk_result], + ) + + current_devs = existing_devices or {} + mocker.patch( + "imagecraft.losetup.server._server.lxd_get", + return_value={"metadata": {"expanded_devices": current_devs}}, + ) + mock_patch = mocker.patch("imagecraft.losetup.server._server.lxd_patch") + + return mock_run, mock_patch + + def test_returns_device_paths(self, mocker): + self._setup(mocker) + + result = _handle_attach("imagecraft", "my-container", "/root/disk.img") + + assert "/dev/imagecraft-loop0" in result + assert "/dev/imagecraft-loop0p1" in result + assert "/dev/imagecraft-loop0p2" in result + + def test_uses_slot_zero_when_no_existing_devices(self, mocker): + self._setup(mocker, existing_devices={}) + + result = _handle_attach("imagecraft", "my-container", "/root/disk.img") + + assert all("loop0" in p for p in result) + + def test_uses_next_free_slot_when_slot_zero_taken(self, mocker): + existing = { + "imagecraft-loop0": {"type": "unix-block", "path": "/dev/imagecraft-loop0"}, + "imagecraft-loop0p1": { + "type": "unix-block", + "path": "/dev/imagecraft-loop0p1", + }, + } + self._setup(mocker, existing_devices=existing) + + result = _handle_attach("imagecraft", "my-container", "/root/disk.img") + + assert any("loop1" in p for p in result) + assert not any("loop0p" in p for p in result) or any( + "loop1" in p for p in result + ) + + def test_patches_lxd_with_new_devices(self, mocker): + _, mock_patch = self._setup(mocker, partitions=["loop133p1"]) + + _handle_attach("imagecraft", "my-container", "/root/disk.img") + + mock_patch.assert_called_once() + call_args = mock_patch.call_args + devices_arg = call_args[0][1]["devices"] + names = list(devices_arg.keys()) + assert any("imagecraft-loop" in n for n in names) + + def test_losetup_called_with_host_path(self, mocker): + mock_run, _ = self._setup(mocker, partitions=[]) + + _handle_attach("imagecraft", "my-container", "/root/disk.img") + + losetup_cmd = mock_run.call_args_list[0][0][0] + assert losetup_cmd == [ + "losetup", + "--find", + "--show", + "--partscan", + "/host/disk.img", + ] + + def test_no_partitions(self, mocker): + self._setup(mocker, partitions=[]) + + result = _handle_attach("imagecraft", "my-container", "/root/disk.img") + + assert result == ["/dev/imagecraft-loop0"] + + +# ─── _handle_detach ─────────────────────────────────────────────────────────── + + +class TestHandleDetach: + def _make_devices(self, loop_alias="imagecraft-loop0", loop_dev="/dev/loop133"): + return { + loop_alias: { + "type": "unix-block", + "path": f"/dev/{loop_alias}", + "source": loop_dev, + }, + f"{loop_alias}p1": { + "type": "unix-block", + "path": f"/dev/{loop_alias}p1", + "source": f"{loop_dev}p1", + }, + f"{loop_alias}p2": { + "type": "unix-block", + "path": f"/dev/{loop_alias}p2", + "source": f"{loop_dev}p2", + }, + } + + def _setup(self, mocker, devices=None): + if devices is None: + devices = self._make_devices() + mocker.patch( + "imagecraft.losetup.server._server.lxd_get", + return_value={"metadata": {"expanded_devices": devices}}, + ) + mock_patch = mocker.patch("imagecraft.losetup.server._server.lxd_patch") + mock_run = mocker.patch("imagecraft.losetup.server._server.subprocess.run") + return mock_patch, mock_run + + def test_returns_removed_paths(self, mocker): + mock_patch, _ = self._setup(mocker) + + result = _handle_detach( + "imagecraft", "my-container", "/dev/imagecraft-loop0" + ) + + assert set(result) == { + "/dev/imagecraft-loop0", + "/dev/imagecraft-loop0p1", + "/dev/imagecraft-loop0p2", + } + + def test_runs_losetup_detach_on_host_device(self, mocker): + _, mock_run = self._setup(mocker) + + _handle_detach("imagecraft", "my-container", "/dev/imagecraft-loop0") + + mock_run.assert_called_once_with( + ["losetup", "--detach", "/dev/loop133"], check=True + ) + + def test_patches_lxd_with_none_to_remove_devices(self, mocker): + mock_patch, _ = self._setup(mocker) + + _handle_detach("imagecraft", "my-container", "/dev/imagecraft-loop0") + + mock_patch.assert_called_once() + devices_arg = mock_patch.call_args[0][1]["devices"] + # All removed devices mapped to None + assert all(v is None for v in devices_arg.values()) + assert "imagecraft-loop0" in devices_arg + + def test_raises_when_path_not_found(self, mocker): + self._setup(mocker) + + with pytest.raises(ValueError, match="no unix-block device with path"): + _handle_detach( + "imagecraft", "my-container", "/dev/imagecraft-loop99" + ) + + def test_removes_partition_devices_with_loop_prefix(self, mocker): + """Partition sources like /dev/loop133p1 are matched by prefix.""" + mock_patch, _ = self._setup(mocker) + + _handle_detach("imagecraft", "my-container", "/dev/imagecraft-loop0") + + devices_removed = mock_patch.call_args[0][1]["devices"] + # Both the main device and its partitions should be removed + assert "imagecraft-loop0p1" in devices_removed + assert "imagecraft-loop0p2" in devices_removed + + def test_does_not_remove_unrelated_devices(self, mocker): + devices = { + **self._make_devices("imagecraft-loop0", "/dev/loop133"), + "imagecraft-loop1": { + "type": "unix-block", + "path": "/dev/imagecraft-loop1", + "source": "/dev/loop134", + }, + } + mock_patch, _ = self._setup(mocker, devices=devices) + + _handle_detach("imagecraft", "my-container", "/dev/imagecraft-loop0") + + devices_removed = mock_patch.call_args[0][1]["devices"] + assert "imagecraft-loop1" not in devices_removed + + +# ─── _RequestHandler (via socket pair) ──────────────────────────────────────── + + +class TestRequestHandler: + def test_valid_attach_returns_200(self, mocker): + mocker.patch( + "imagecraft.losetup.server._server._handle_attach", + return_value=["/dev/imagecraft-loop0", "/dev/imagecraft-loop0p1"], + ) + + req = _make_http_request("POST", "/1.0/attach?path=/root/disk.img") + code, body = _run_handler("imagecraft", "my-container", req) + + assert code == 200 + assert body["metadata"] == ["/dev/imagecraft-loop0", "/dev/imagecraft-loop0p1"] + assert body["status_code"] == 200 + + def test_valid_detach_returns_200(self, mocker): + mocker.patch( + "imagecraft.losetup.server._server._handle_detach", + return_value=["/dev/imagecraft-loop0"], + ) + + req = _make_http_request("POST", "/1.0/detach?path=/dev/imagecraft-loop0") + code, body = _run_handler("imagecraft", "my-container", req) + + assert code == 200 + assert body["metadata"] == ["/dev/imagecraft-loop0"] + + def test_missing_path_param_returns_400(self, mocker): + req = _make_http_request("POST", "/1.0/attach") + code, body = _run_handler("imagecraft", "my-container", req) + + assert code == 400 + assert "path" in body["error"] + + def test_unknown_endpoint_returns_404(self, mocker): + req = _make_http_request("POST", "/1.0/unknown?path=/x") + code, body = _run_handler("imagecraft", "my-container", req) + + assert code == 404 + + def test_handler_exception_returns_500(self, mocker): + mocker.patch( + "imagecraft.losetup.server._server._handle_attach", + side_effect=RuntimeError("unexpected failure"), + ) + + req = _make_http_request("POST", "/1.0/attach?path=/root/disk.img") + code, body = _run_handler("imagecraft", "my-container", req) + + assert code == 500 + assert "unexpected failure" in body["error"] + + def test_attach_receives_correct_args(self, mocker): + mock_attach = mocker.patch( + "imagecraft.losetup.server._server._handle_attach", + return_value=[], + ) + + req = _make_http_request("POST", "/1.0/attach?path=/root/disk.img") + _run_handler("imagecraft", "my-container", req) + + mock_attach.assert_called_once_with( + "imagecraft", "my-container", "/root/disk.img" + ) + + def test_detach_receives_correct_args(self, mocker): + mock_detach = mocker.patch( + "imagecraft.losetup.server._server._handle_detach", + return_value=[], + ) + + req = _make_http_request("POST", "/1.0/detach?path=/dev/imagecraft-loop0") + _run_handler("imagecraft", "my-container", req) + + mock_detach.assert_called_once_with( + "imagecraft", "my-container", "/dev/imagecraft-loop0" + ) diff --git a/tests/unit/losetup/test_client.py b/tests/unit/losetup/test_client.py new file mode 100644 index 00000000..4e3f4980 --- /dev/null +++ b/tests/unit/losetup/test_client.py @@ -0,0 +1,451 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Unit tests for imagecraft/losetup/__init__.py (the losetup client).""" + +import json +import pathlib +import subprocess +from unittest.mock import MagicMock + +import pytest + +import imagecraft.losetup as losetup_module +from imagecraft.losetup import ( + _LXD_GUEST_SOCK, + _LOOPSERVER_SOCK, + _is_lxd_container, + _loopserver_request, + _losetup_attach, + _losetup_detach, + attach, + detach, +) + + +# ─── constants ──────────────────────────────────────────────────────────────── + + +def test_lxd_guest_sock_constant(): + assert _LXD_GUEST_SOCK == "/dev/lxd/sock" + + +def test_loopserver_sock_constant(): + assert _LOOPSERVER_SOCK == "/dev/losetup/sock" + + +# ─── _is_lxd_container ──────────────────────────────────────────────────────── + + +class TestIsLxdContainer: + def test_returns_false_when_sock_missing(self, mocker, tmp_path): + mocker.patch("imagecraft.losetup._LXD_GUEST_SOCK", str(tmp_path / "nonexistent")) + mocker.patch("craft_cli.emit.debug") + assert _is_lxd_container() is False + + def test_returns_true_when_instance_type_container(self, mocker, tmp_path): + sock_path = tmp_path / "sock" + sock_path.touch() + mocker.patch("imagecraft.losetup._LXD_GUEST_SOCK", str(sock_path)) + mocker.patch("craft_cli.emit.debug") + + mock_response = MagicMock() + mock_response.read.return_value = json.dumps( + {"instance_type": "container"} + ).encode() + mock_conn = MagicMock() + mock_conn.getresponse.return_value = mock_response + mocker.patch( + "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn + ) + mocker.patch("imagecraft.losetup.socket.socket") + + assert _is_lxd_container() is True + + def test_returns_false_when_instance_type_vm(self, mocker, tmp_path): + sock_path = tmp_path / "sock" + sock_path.touch() + mocker.patch("imagecraft.losetup._LXD_GUEST_SOCK", str(sock_path)) + mocker.patch("craft_cli.emit.debug") + + mock_response = MagicMock() + mock_response.read.return_value = json.dumps( + {"instance_type": "virtual-machine"} + ).encode() + mock_conn = MagicMock() + mock_conn.getresponse.return_value = mock_response + mocker.patch( + "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn + ) + mocker.patch("imagecraft.losetup.socket.socket") + + assert _is_lxd_container() is False + + def test_returns_false_when_instance_type_missing(self, mocker, tmp_path): + sock_path = tmp_path / "sock" + sock_path.touch() + mocker.patch("imagecraft.losetup._LXD_GUEST_SOCK", str(sock_path)) + mocker.patch("craft_cli.emit.debug") + + mock_response = MagicMock() + mock_response.read.return_value = json.dumps({}).encode() + mock_conn = MagicMock() + mock_conn.getresponse.return_value = mock_response + mocker.patch( + "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn + ) + mocker.patch("imagecraft.losetup.socket.socket") + + assert _is_lxd_container() is False + + def test_returns_false_on_connection_error(self, mocker, tmp_path): + sock_path = tmp_path / "sock" + sock_path.touch() + mocker.patch("imagecraft.losetup._LXD_GUEST_SOCK", str(sock_path)) + mocker.patch("craft_cli.emit.debug") + + mock_sock = MagicMock() + mock_sock.connect.side_effect = ConnectionRefusedError("refused") + mocker.patch("imagecraft.losetup.socket.socket", return_value=mock_sock) + mocker.patch("imagecraft.losetup.http.client.HTTPConnection") + + assert _is_lxd_container() is False + + def test_returns_false_on_json_error(self, mocker, tmp_path): + sock_path = tmp_path / "sock" + sock_path.touch() + mocker.patch("imagecraft.losetup._LXD_GUEST_SOCK", str(sock_path)) + mocker.patch("craft_cli.emit.debug") + + mock_response = MagicMock() + mock_response.read.return_value = b"not-json" + mock_conn = MagicMock() + mock_conn.getresponse.return_value = mock_response + mocker.patch( + "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn + ) + mocker.patch("imagecraft.losetup.socket.socket") + + assert _is_lxd_container() is False + + def test_never_raises_on_connection_failure(self, mocker, tmp_path): + """Exceptions during the HTTP call (inside the try block) never propagate.""" + sock_path = tmp_path / "sock" + sock_path.touch() + mocker.patch("imagecraft.losetup._LXD_GUEST_SOCK", str(sock_path)) + mocker.patch("craft_cli.emit.debug") + + mock_conn = MagicMock() + mock_conn.getresponse.side_effect = OSError("pipe broken") + mocker.patch( + "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn + ) + mocker.patch("imagecraft.losetup.socket.socket") + + result = _is_lxd_container() + assert result is False + + +# ─── _loopserver_request ────────────────────────────────────────────────────── + + +class TestLoopserverRequest: + def _make_conn(self, mocker, response_data: dict): + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(response_data).encode() + mock_conn = MagicMock() + mock_conn.getresponse.return_value = mock_response + mocker.patch( + "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn + ) + mocker.patch("imagecraft.losetup.socket.socket") + return mock_conn + + def test_returns_metadata_on_success(self, mocker): + expected = ["/dev/imagecraft-loop0", "/dev/imagecraft-loop0p1"] + self._make_conn(mocker, {"status_code": 200, "metadata": expected}) + + result = _loopserver_request("/1.0/attach", "/root/disk.img") + + assert result == expected + + def test_raises_on_error_code(self, mocker): + self._make_conn( + mocker, + {"error_code": 500, "error": "losetup failed"}, + ) + + with pytest.raises(RuntimeError, match="losetup failed"): + _loopserver_request("/1.0/attach", "/root/disk.img") + + def test_raises_with_fallback_message_when_no_error_field(self, mocker): + self._make_conn(mocker, {"error_code": 500}) + + with pytest.raises(RuntimeError, match="unknown error from loopserver"): + _loopserver_request("/1.0/attach", "/root/disk.img") + + def test_connects_to_loopserver_sock(self, mocker): + mock_sock = MagicMock() + mocker.patch("imagecraft.losetup.socket.socket", return_value=mock_sock) + mock_conn = MagicMock() + mock_conn.getresponse.return_value.read.return_value = json.dumps( + {"metadata": []} + ).encode() + mocker.patch( + "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn + ) + + _loopserver_request("/1.0/attach", "/some/path") + + mock_sock.connect.assert_called_once_with(_LOOPSERVER_SOCK) + + def test_path_encoded_in_query(self, mocker): + mock_sock = MagicMock() + mocker.patch("imagecraft.losetup.socket.socket", return_value=mock_sock) + mock_conn = MagicMock() + mock_conn.getresponse.return_value.read.return_value = json.dumps( + {"metadata": []} + ).encode() + mocker.patch( + "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn + ) + + _loopserver_request("/1.0/attach", "/root/my image.img") + + url = mock_conn.request.call_args[0][1] + assert "path=%2Froot%2Fmy+image.img" in url or "path=%2Froot%2Fmy%20image.img" in url + + +# ─── _losetup_attach ────────────────────────────────────────────────────────── + + +class TestLosetupAttach: + def _make_run(self, mocker, loop_dev="/dev/loop8", partitions=None): + partitions = partitions or [] + children = [{"name": f"loop8{p}", "type": "part"} for p in partitions] + lsblk_output = json.dumps( + { + "blockdevices": [ + {"name": "loop8", "type": "loop", "children": children} + ] + } + ) + losetup_result = MagicMock() + losetup_result.stdout = loop_dev + "\n" + lsblk_result = MagicMock() + lsblk_result.stdout = lsblk_output + + return mocker.patch( + "imagecraft.losetup.subprocess.run", + side_effect=[losetup_result, lsblk_result], + ) + + def test_returns_loop_device_no_partitions(self, mocker, tmp_path): + img = tmp_path / "disk.img" + self._make_run(mocker, loop_dev="/dev/loop8", partitions=[]) + + result = _losetup_attach(img) + + assert result == ["/dev/loop8"] + + def test_returns_loop_and_partitions(self, mocker, tmp_path): + img = tmp_path / "disk.img" + self._make_run(mocker, loop_dev="/dev/loop8", partitions=["p1", "p2"]) + + result = _losetup_attach(img) + + assert result == ["/dev/loop8", "/dev/loop8p1", "/dev/loop8p2"] + + def test_calls_losetup_find_show_partscan(self, mocker, tmp_path): + img = tmp_path / "disk.img" + mock_run = self._make_run(mocker, loop_dev="/dev/loop8") + + _losetup_attach(img) + + losetup_call = mock_run.call_args_list[0] + assert losetup_call[0][0] == [ + "losetup", + "--find", + "--show", + "--partscan", + str(img), + ] + + def test_calls_lsblk_on_loop_device(self, mocker, tmp_path): + img = tmp_path / "disk.img" + mock_run = self._make_run(mocker, loop_dev="/dev/loop8") + + _losetup_attach(img) + + lsblk_call = mock_run.call_args_list[1] + cmd = lsblk_call[0][0] + assert "lsblk" in cmd + assert "/dev/loop8" in cmd + + def test_ignores_non_part_children(self, mocker, tmp_path): + img = tmp_path / "disk.img" + lsblk_output = json.dumps( + { + "blockdevices": [ + { + "name": "loop8", + "type": "loop", + "children": [ + {"name": "loop8p1", "type": "part"}, + {"name": "dm-0", "type": "dm"}, # not a partition + ], + } + ] + } + ) + losetup_result = MagicMock() + losetup_result.stdout = "/dev/loop8\n" + lsblk_result = MagicMock() + lsblk_result.stdout = lsblk_output + mocker.patch( + "imagecraft.losetup.subprocess.run", + side_effect=[losetup_result, lsblk_result], + ) + + result = _losetup_attach(img) + + assert "/dev/dm-0" not in result + assert "/dev/loop8p1" in result + + +# ─── _losetup_detach ────────────────────────────────────────────────────────── + + +class TestLosetupDetach: + def _setup(self, mocker, device="/dev/loop8", partitions=None): + partitions = partitions or [] + children = [{"name": f"loop8{p}", "type": "part"} for p in partitions] + lsblk_output = json.dumps( + {"blockdevices": [{"name": "loop8", "type": "loop", "children": children}]} + ) + lsblk_result = MagicMock() + lsblk_result.stdout = lsblk_output + detach_result = MagicMock() + + return mocker.patch( + "imagecraft.losetup.subprocess.run", + side_effect=[lsblk_result, detach_result], + ) + + def test_returns_device_no_partitions(self, mocker): + self._setup(mocker, device="/dev/loop8", partitions=[]) + + result = _losetup_detach("/dev/loop8") + + assert result == ["/dev/loop8"] + + def test_returns_device_and_partitions(self, mocker): + self._setup(mocker, device="/dev/loop8", partitions=["p1", "p2"]) + + result = _losetup_detach("/dev/loop8") + + assert result == ["/dev/loop8", "/dev/loop8p1", "/dev/loop8p2"] + + def test_calls_losetup_detach(self, mocker): + mock_run = self._setup(mocker) + + _losetup_detach("/dev/loop8") + + detach_call = mock_run.call_args_list[1] + assert detach_call[0][0] == ["losetup", "--detach", "/dev/loop8"] + + def test_ignores_non_part_children(self, mocker): + lsblk_output = json.dumps( + { + "blockdevices": [ + { + "name": "loop8", + "type": "loop", + "children": [ + {"name": "loop8p1", "type": "part"}, + {"name": "dm-0", "type": "dm"}, + ], + } + ] + } + ) + lsblk_result = MagicMock() + lsblk_result.stdout = lsblk_output + mocker.patch( + "imagecraft.losetup.subprocess.run", + side_effect=[lsblk_result, MagicMock()], + ) + + result = _losetup_detach("/dev/loop8") + + assert "/dev/dm-0" not in result + + +# ─── attach / detach (high-level) ───────────────────────────────────────────── + + +class TestAttach: + def test_delegates_to_loopserver_when_lxd_container(self, mocker): + mocker.patch("imagecraft.losetup._is_lxd_container", return_value=True) + mock_req = mocker.patch( + "imagecraft.losetup._loopserver_request", + return_value=["/dev/imagecraft-loop0"], + ) + mocker.patch("craft_cli.emit.debug") + + result = attach(pathlib.Path("/root/disk.img")) + + mock_req.assert_called_once_with("/1.0/attach", "/root/disk.img") + assert result == ["/dev/imagecraft-loop0"] + + def test_delegates_to_losetup_when_not_container(self, mocker): + mocker.patch("imagecraft.losetup._is_lxd_container", return_value=False) + mock_attach = mocker.patch( + "imagecraft.losetup._losetup_attach", + return_value=["/dev/loop8"], + ) + mocker.patch("craft_cli.emit.debug") + + result = attach(pathlib.Path("/root/disk.img")) + + mock_attach.assert_called_once_with(pathlib.Path("/root/disk.img")) + assert result == ["/dev/loop8"] + + +class TestDetach: + def test_delegates_to_loopserver_when_lxd_container(self, mocker): + mocker.patch("imagecraft.losetup._is_lxd_container", return_value=True) + mock_req = mocker.patch( + "imagecraft.losetup._loopserver_request", + return_value=["/dev/imagecraft-loop0"], + ) + mocker.patch("craft_cli.emit.debug") + + result = detach("/dev/imagecraft-loop0") + + mock_req.assert_called_once_with("/1.0/detach", "/dev/imagecraft-loop0") + assert result == ["/dev/imagecraft-loop0"] + + def test_delegates_to_losetup_when_not_container(self, mocker): + mocker.patch("imagecraft.losetup._is_lxd_container", return_value=False) + mock_detach = mocker.patch( + "imagecraft.losetup._losetup_detach", + return_value=["/dev/loop8"], + ) + mocker.patch("craft_cli.emit.debug") + + result = detach("/dev/loop8") + + mock_detach.assert_called_once_with("/dev/loop8") + assert result == ["/dev/loop8"] diff --git a/tests/unit/services/test_image.py b/tests/unit/services/test_image.py index 221d84af..92347c38 100644 --- a/tests/unit/services/test_image.py +++ b/tests/unit/services/test_image.py @@ -12,7 +12,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import subprocess from unittest.mock import MagicMock, patch import pytest @@ -99,95 +98,53 @@ def test_create_images_idempotent(image_service, mock_services, mock_project): def test_attach_images_new(image_service, project_dir, mocker): image_service._images = {"pc": project_dir / ".pc.img.tmp"} - mock_run = mocker.patch("imagecraft.services.image.run") - # Mock _get_all_loop_devices returns empty - mocker.patch.object(image_service, "_get_all_loop_devices", return_value=[]) - - mock_run.return_value.stdout = "/dev/loop8\n" + mock_attach = mocker.patch( + "imagecraft.losetup.attach", + return_value=["/dev/loop8"], + ) with patch("atexit.register") as mock_atexit: devices = image_service.attach_images() assert devices == {"pc": "/dev/loop8"} - mock_run.assert_called_with( - "losetup", - "--find", - "--show", - "--partscan", - str(project_dir / ".pc.img.tmp"), - ) + mock_attach.assert_called_once_with(project_dir / ".pc.img.tmp") mock_atexit.assert_called_once_with(image_service.detach_images) -def test_attach_images_reuse(image_service, project_dir, mocker): - image_path = project_dir / ".pc.img.tmp" - image_path.touch() - image_service._images = {"pc": image_path} - - # Mock existing loop device - mocker.patch.object( - image_service, - "_get_all_loop_devices", - return_value=[{"name": "/dev/loop9", "back-file": str(image_path)}], - ) - - # Mock samefile to return True - mocker.patch("pathlib.Path.samefile", return_value=True) - mock_run = mocker.patch("imagecraft.services.image.run") - - devices = image_service.attach_images() - - assert devices == {"pc": "/dev/loop9"} - mock_run.assert_not_called() # Should not call losetup attach - - -def test_attach_images_stale_inode(image_service, project_dir, mocker): - image_path = project_dir / ".pc.img.tmp" - image_path.touch() - image_service._images = {"pc": image_path} +def test_attach_images_idempotent(image_service, project_dir, mocker): + image_service._images = {"pc": project_dir / ".pc.img.tmp"} - # Mock existing loop device - mocker.patch.object( - image_service, - "_get_all_loop_devices", - return_value=[{"name": "/dev/loop10", "back-file": str(image_path)}], + mock_attach = mocker.patch( + "imagecraft.losetup.attach", + return_value=["/dev/loop8"], ) - # Mock samefile to raise FileNotFoundError (stale inode) - mocker.patch("pathlib.Path.samefile", side_effect=FileNotFoundError) - mock_run = mocker.patch("imagecraft.services.image.run") - mock_run.return_value.stdout = "/dev/loop11\n" - - devices = image_service.attach_images() + first = image_service.attach_images() + second = image_service.attach_images() - assert devices == {"pc": "/dev/loop11"} - # Should detach stale - mock_run.assert_any_call("losetup", "-d", "/dev/loop10") - # Should attach new - mock_run.assert_any_call( - "losetup", "--find", "--show", "--partscan", str(image_path) - ) + assert first == second == {"pc": "/dev/loop8"} + mock_attach.assert_called_once() # only called the first time def test_detach_images_success(image_service, mocker): image_service._loop_devices = {"pc": "/dev/loop8"} - mock_run = mocker.patch("imagecraft.services.image.run") + mock_detach = mocker.patch("imagecraft.losetup.detach") image_service.detach_images() - mock_run.assert_called_once_with("losetup", "-d", "/dev/loop8") + mock_detach.assert_called_once_with("/dev/loop8") assert image_service._loop_devices == {} def test_detach_images_retry(image_service, mocker): image_service._loop_devices = {"pc": "/dev/loop8"} - mock_run = mocker.patch("imagecraft.services.image.run") + mock_detach = mocker.patch("imagecraft.losetup.detach") # Fail twice, then succeed - mock_run.side_effect = [ - subprocess.CalledProcessError(1, "losetup"), - subprocess.CalledProcessError(1, "losetup"), - MagicMock(), + mock_detach.side_effect = [ + RuntimeError("device busy"), + RuntimeError("device busy"), + None, ] mocker.patch("time.monotonic", side_effect=[0, 1, 2, 3, 4]) @@ -195,7 +152,7 @@ def test_detach_images_retry(image_service, mocker): image_service.detach_images() - assert mock_run.call_count == 3 + assert mock_detach.call_count == 3 assert image_service._loop_devices == {} diff --git a/tests/unit/services/test_provider.py b/tests/unit/services/test_provider.py new file mode 100644 index 00000000..a263ed61 --- /dev/null +++ b/tests/unit/services/test_provider.py @@ -0,0 +1,180 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Unit tests for imagecraft/services/provider.py – Provider.instance override.""" + +import pathlib +from contextlib import contextmanager +from unittest.mock import MagicMock + +import pytest +from craft_application import AppMetadata, ServiceFactory +from craft_application.services.provider import ProviderService +from craft_providers.lxd import LXDInstance, LXDProvider +from craft_providers.multipass import MultipassProvider + +from imagecraft.services.provider import Provider + + +# ─── fixtures ───────────────────────────────────────────────────────────────── + + +@pytest.fixture +def mock_lxd_inst(): + """A mock pylxd instance (returned by _client.instances.get()).""" + inst = MagicMock() + inst.devices = {} + return inst + + +@pytest.fixture +def mock_instance(mock_lxd_inst): + """A mock LXDInstance with a wired-up pylxd _client.""" + m = MagicMock(spec=LXDInstance) + m.instance_name = "test-container" + m._client = MagicMock() + m._client.instances.get.return_value = mock_lxd_inst + return m + + +@pytest.fixture +def provider(tmp_path): + app = MagicMock(spec=AppMetadata) + services = MagicMock(spec=ServiceFactory) + return Provider(app, services, work_dir=tmp_path) + + +# ─── helpers ────────────────────────────────────────────────────────────────── + + +def _patch_super_instance(mocker, mock_inst): + """Patch ProviderService.instance to yield *mock_inst*.""" + + @contextmanager + def fake_instance(self, build_info, *, work_dir, **kwargs): + yield mock_inst + + mocker.patch.object(ProviderService, "instance", fake_instance) + + +# ─── tests ──────────────────────────────────────────────────────────────────── + + +class TestProviderInstance: + def test_lxd_with_snap_data_mounts_losetup( + self, mocker, provider, mock_instance, mock_lxd_inst, tmp_path, monkeypatch + ): + monkeypatch.setenv("SNAP_DATA", str(tmp_path)) + _patch_super_instance(mocker, mock_instance) + mocker.patch.object( + provider, "get_provider", return_value=MagicMock(spec=LXDProvider) + ) + mocker.patch("craft_cli.emit.debug") + + build_info = MagicMock() + with provider.instance(build_info, work_dir=tmp_path) as inst: + assert inst is mock_instance + + assert "disk-dev-losetup" in mock_lxd_inst.devices + device = mock_lxd_inst.devices["disk-dev-losetup"] + assert device["type"] == "disk" + assert device["path"] == "/dev/losetup" + assert device["source"] == str(tmp_path / "losetup") + assert device["shift"] == "true" + mock_lxd_inst.save.assert_called_once_with(wait=True) + + def test_lxd_with_snap_data_creates_losetup_dir( + self, mocker, provider, mock_instance, tmp_path, monkeypatch + ): + monkeypatch.setenv("SNAP_DATA", str(tmp_path)) + _patch_super_instance(mocker, mock_instance) + mocker.patch.object( + provider, "get_provider", return_value=MagicMock(spec=LXDProvider) + ) + mocker.patch("craft_cli.emit.debug") + + build_info = MagicMock() + with provider.instance(build_info, work_dir=tmp_path): + pass + + assert (tmp_path / "losetup").is_dir() + + def test_lxd_without_snap_data_does_not_mount( + self, mocker, provider, mock_instance, mock_lxd_inst, tmp_path, monkeypatch + ): + monkeypatch.delenv("SNAP_DATA", raising=False) + _patch_super_instance(mocker, mock_instance) + mocker.patch.object( + provider, "get_provider", return_value=MagicMock(spec=LXDProvider) + ) + mocker.patch("craft_cli.emit.debug") + + build_info = MagicMock() + with provider.instance(build_info, work_dir=tmp_path) as inst: + assert inst is mock_instance + + mock_lxd_inst.save.assert_not_called() + + def test_non_lxd_provider_does_not_mount( + self, mocker, provider, mock_instance, mock_lxd_inst, tmp_path, monkeypatch + ): + monkeypatch.setenv("SNAP_DATA", str(tmp_path)) + _patch_super_instance(mocker, mock_instance) + mocker.patch.object( + provider, "get_provider", return_value=MagicMock(spec=MultipassProvider) + ) + mocker.patch("craft_cli.emit.debug") + + build_info = MagicMock() + with provider.instance(build_info, work_dir=tmp_path) as inst: + assert inst is mock_instance + + mock_lxd_inst.save.assert_not_called() + + def test_already_mounted_does_not_add_duplicate( + self, mocker, provider, mock_instance, mock_lxd_inst, tmp_path, monkeypatch + ): + """If disk-dev-losetup already exists in devices, save() is not called again.""" + monkeypatch.setenv("SNAP_DATA", str(tmp_path)) + _patch_super_instance(mocker, mock_instance) + mocker.patch.object( + provider, "get_provider", return_value=MagicMock(spec=LXDProvider) + ) + mocker.patch("craft_cli.emit.debug") + mock_lxd_inst.devices["disk-dev-losetup"] = { + "type": "disk", + "path": "/dev/losetup", + "source": str(tmp_path / "losetup"), + "shift": "true", + } + + build_info = MagicMock() + with provider.instance(build_info, work_dir=tmp_path): + pass + + mock_lxd_inst.save.assert_not_called() + + def test_yields_instance_from_super( + self, mocker, provider, mock_instance, tmp_path, monkeypatch + ): + monkeypatch.delenv("SNAP_DATA", raising=False) + _patch_super_instance(mocker, mock_instance) + mocker.patch.object( + provider, "get_provider", return_value=MagicMock(spec=MultipassProvider) + ) + mocker.patch("craft_cli.emit.debug") + + build_info = MagicMock() + with provider.instance(build_info, work_dir=tmp_path) as inst: + assert inst is mock_instance diff --git a/tests/unit/test_configure.py b/tests/unit/test_configure.py new file mode 100644 index 00000000..121c9a30 --- /dev/null +++ b/tests/unit/test_configure.py @@ -0,0 +1,90 @@ +# Copyright 2026 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Unit tests for snap/hooks/configure – setup_losetup_dir().""" + +import importlib.util +import os +import pathlib +import sys +import types + +import pytest + + +def _load_configure_hook() -> types.ModuleType: + """Load snap/hooks/configure as a Python module (no .py extension).""" + import importlib.machinery + + hook_path = ( + pathlib.Path(__file__).parent.parent.parent.parent / "snap" / "hooks" / "configure" + ) + loader = importlib.machinery.SourceFileLoader("snap_configure", str(hook_path)) + spec = importlib.util.spec_from_loader("snap_configure", loader) + assert spec is not None + module = importlib.util.module_from_spec(spec) + loader.exec_module(module) + return module + + +@pytest.fixture(scope="module") +def configure_module(): + return _load_configure_hook() + + +class TestSetupLosetupDir: + def test_creates_directory(self, configure_module, tmp_path, monkeypatch): + monkeypatch.setenv("SNAP_DATA", str(tmp_path)) + + configure_module.setup_losetup_dir() + + assert (tmp_path / "losetup").is_dir() + + def test_does_not_raise_if_directory_exists( + self, configure_module, tmp_path, monkeypatch + ): + monkeypatch.setenv("SNAP_DATA", str(tmp_path)) + (tmp_path / "losetup").mkdir() # pre-create + + # Should not raise + configure_module.setup_losetup_dir() + + def test_chowns_when_lxd_group_exists( + self, configure_module, tmp_path, monkeypatch, mocker + ): + monkeypatch.setenv("SNAP_DATA", str(tmp_path)) + + mock_gr = mocker.MagicMock() + mock_gr.gr_gid = 1234 + mocker.patch.object(configure_module.grp, "getgrnam", return_value=mock_gr) + mock_chown = mocker.patch.object(configure_module.os, "chown") + + configure_module.setup_losetup_dir() + + losetup_dir = os.path.join(str(tmp_path), "losetup") + mock_chown.assert_called_once_with(losetup_dir, -1, 1234) + + def test_no_chown_when_lxd_group_missing( + self, configure_module, tmp_path, monkeypatch, mocker + ): + monkeypatch.setenv("SNAP_DATA", str(tmp_path)) + + mocker.patch.object( + configure_module.grp, "getgrnam", side_effect=KeyError("lxd") + ) + mock_chown = mocker.patch.object(configure_module.os, "chown") + + configure_module.setup_losetup_dir() + + mock_chown.assert_not_called()