diff --git a/imagecraft/cli.py b/imagecraft/cli.py
index 46c6bd8b..1a25d7a6 100644
--- a/imagecraft/cli.py
+++ b/imagecraft/cli.py
@@ -48,6 +48,9 @@ def register_services() -> None:
ServiceFactory.register(
"project", "ImagecraftProjectService", module="imagecraft.services.project"
)
+ ServiceFactory.register(
+ "provider", "Provider", module="imagecraft.services.provider"
+ )
def _create_app() -> Imagecraft:
diff --git a/imagecraft/losetup/__init__.py b/imagecraft/losetup/__init__.py
new file mode 100644
index 00000000..43b716eb
--- /dev/null
+++ b/imagecraft/losetup/__init__.py
@@ -0,0 +1,145 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""losetup client: attach/detach loop devices via the loopserver REST API when
+running inside an LXD container, or via the real losetup binary otherwise.
+"""
+
+import http.client
+import json
+import pathlib
+import socket
+import subprocess
+import urllib.parse
+
+from craft_cli import emit
+
+_LXD_GUEST_SOCK = "/dev/lxd/sock"
+_LOOPSERVER_SOCK = "/dev/losetup/sock"
+
+
+def _is_lxd_container() -> bool:
+ """Return True if we are running inside an LXD container."""
+ if not pathlib.Path(_LXD_GUEST_SOCK).exists():
+ emit.debug(f"LXD guest socket {_LXD_GUEST_SOCK!r} not found; not in a container")
+ return False
+ try:
+ conn = http.client.HTTPConnection("lxd")
+ conn.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ conn.sock.connect(_LXD_GUEST_SOCK)
+ conn.request("GET", "/1.0")
+ data = json.loads(conn.getresponse().read())
+ instance_type = data.get("instance_type")
+ if instance_type == "container":
+ emit.debug(
+ f"Detected LXD container via {_LXD_GUEST_SOCK!r} "
+ f"(instance_type={instance_type!r})"
+ )
+ return True
+ emit.debug(
+ f"LXD guest socket present but instance_type={instance_type!r}; "
+ "not treating as container"
+ )
+ return False
+ except Exception as exc: # noqa: BLE001
+ emit.debug(
+ f"Failed to query LXD guest socket {_LXD_GUEST_SOCK!r}: {exc}; "
+ "assuming not in a container"
+ )
+ return False
+
+
+def _loopserver_request(endpoint: str, path: str) -> list[str]:
+ """POST to the loopserver REST API and return the metadata list."""
+ query = urllib.parse.urlencode({"path": path})
+ conn = http.client.HTTPConnection("loopserver")
+ conn.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ conn.sock.connect(_LOOPSERVER_SOCK)
+ conn.request("POST", f"{endpoint}?{query}")
+ response = json.loads(conn.getresponse().read())
+ if response.get("error_code"):
+ raise RuntimeError(response.get("error", "unknown error from loopserver"))
+ return response["metadata"]
+
+
+def _losetup_attach(path: pathlib.Path) -> list[str]:
+ """Attach *path* as a loop device using the real losetup, return all device paths."""
+ loop_dev = subprocess.run(
+ ["losetup", "--find", "--show", "--partscan", str(path)],
+ capture_output=True,
+ text=True,
+ check=True,
+ ).stdout.strip()
+
+ lsblk_data = json.loads(
+ subprocess.run(
+ ["lsblk", "--json", "--output", "NAME,TYPE", loop_dev],
+ capture_output=True,
+ text=True,
+ check=True,
+ ).stdout
+ )
+ devices = [loop_dev]
+ for child in lsblk_data["blockdevices"][0].get("children", []):
+ if child.get("type") == "part":
+ devices.append(f"/dev/{child['name']}")
+ return devices
+
+
+def _losetup_detach(device: str) -> list[str]:
+ """Detach *device* using the real losetup, return all device paths that were removed."""
+ lsblk_data = json.loads(
+ subprocess.run(
+ ["lsblk", "--json", "--output", "NAME,TYPE", device],
+ capture_output=True,
+ text=True,
+ check=True,
+ ).stdout
+ )
+ devices = [device]
+ for child in lsblk_data["blockdevices"][0].get("children", []):
+ if child.get("type") == "part":
+ devices.append(f"/dev/{child['name']}")
+
+ subprocess.run(["losetup", "--detach", device], check=True)
+ return devices
+
+
+def attach(path: pathlib.Path) -> list[str]:
+ """Attach *path* as a loop device and return all device paths (loop + partitions).
+
+ When running inside an LXD container, delegates to the loopserver REST API.
+ Otherwise calls the real losetup binary directly.
+ """
+ if _is_lxd_container():
+ emit.debug(f"Attaching {path} via loopserver at {_LOOPSERVER_SOCK!r}")
+ return _loopserver_request("/1.0/attach", str(path))
+ emit.debug(f"Attaching {path} via losetup directly")
+ return _losetup_attach(path)
+
+
+def detach(device: str) -> list[str]:
+ """Detach the loop device *device* and return all device paths that were removed.
+
+ When running inside an LXD container, delegates to the loopserver REST API.
+ Otherwise calls the real losetup binary directly.
+ """
+ if _is_lxd_container():
+ emit.debug(f"Detaching {device} via loopserver at {_LOOPSERVER_SOCK!r}")
+ return _loopserver_request("/1.0/detach", device)
+ emit.debug(f"Detaching {device} via losetup directly")
+ return _losetup_detach(device)
+
+
+__all__ = ["attach", "detach"]
diff --git a/imagecraft/losetup/server/__init__.py b/imagecraft/losetup/server/__init__.py
new file mode 100644
index 00000000..0fe43623
--- /dev/null
+++ b/imagecraft/losetup/server/__init__.py
@@ -0,0 +1,19 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""losetup server package."""
+
+from imagecraft.losetup.server._server import main
+
+__all__ = ["main"]
diff --git a/imagecraft/losetup/server/__main__.py b/imagecraft/losetup/server/__main__.py
new file mode 100644
index 00000000..7638ee1f
--- /dev/null
+++ b/imagecraft/losetup/server/__main__.py
@@ -0,0 +1,19 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Allow running the losetup server with ``python3 -m imagecraft.losetup.server``."""
+
+from imagecraft.losetup.server._server import main
+
+main()
diff --git a/imagecraft/losetup/server/_cgroup.py b/imagecraft/losetup/server/_cgroup.py
new file mode 100644
index 00000000..dfab105e
--- /dev/null
+++ b/imagecraft/losetup/server/_cgroup.py
@@ -0,0 +1,50 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""cgroup v2 helpers for determining LXD container membership."""
+
+import socket
+import struct
+
+
+def peer_cgroup(conn: socket.socket) -> str:
+ """Return the cgroup v2 string for the process on the other end of *conn*."""
+ cred = conn.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, struct.calcsize("3i"))
+ pid, _uid, _gid = struct.unpack("3i", cred)
+ with open(f"/proc/{pid}/cgroup") as f:
+ return f.read()
+
+
+def parse_lxd_location(cgroup: str) -> tuple[str, str] | None:
+ """Return (project, container) from a cgroup v2 string, or None.
+
+ The cgroup v2 entry always has the form ``0::$PATH``. Within that path we
+ walk the components in reverse to find the innermost one that begins with
+ ``lxc.payload.``, then split it to recover the LXD project and container.
+ LXD project names are restricted to [a-zA-Z0-9-] (no underscores), so the
+ first ``_`` is always the delimiter between project and container name.
+ """
+ for line in cgroup.splitlines():
+ if not line.startswith("0::"):
+ continue
+ path = line[len("0::"):]
+ for component in reversed(path.split("/")):
+ if not component.startswith("lxc.payload."):
+ continue
+ project_container = component.split(".")[-1]
+ parts = project_container.split("_", 1)
+ if len(parts) == 2:
+ return parts[0], parts[1]
+ break # found the v2 line but no lxc.payload component
+ return None
diff --git a/imagecraft/losetup/server/_lxd.py b/imagecraft/losetup/server/_lxd.py
new file mode 100644
index 00000000..f0064f7a
--- /dev/null
+++ b/imagecraft/losetup/server/_lxd.py
@@ -0,0 +1,98 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""LXD REST API helpers and path translation."""
+
+import http.client
+import json
+import os
+import pathlib
+import socket
+
+LXD_SOCKET = "/var/snap/lxd/common/lxd/unix.socket"
+
+
+def _lxd_connect() -> http.client.HTTPConnection:
+ conn = http.client.HTTPConnection("lxd")
+ conn.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ conn.sock.connect(LXD_SOCKET)
+ return conn
+
+
+def lxd_get(path: str) -> dict:
+ """Make a GET request to the LXD REST API and return the parsed response."""
+ conn = _lxd_connect()
+ conn.request("GET", path)
+ return json.loads(conn.getresponse().read())
+
+
+def lxd_patch(path: str, body: dict) -> dict:
+ """Make a PATCH request to the LXD REST API and return the parsed response."""
+ conn = _lxd_connect()
+ data = json.dumps(body).encode()
+ conn.request("PATCH", path, body=data, headers={"Content-Type": "application/json"})
+ return json.loads(conn.getresponse().read())
+
+
+def find_free_loop_slot(devices: dict) -> int:
+ """Find the smallest N such that no ``imagecraft-loopN*`` device exists."""
+ used: set[int] = set()
+ for name in devices:
+ if name.startswith("imagecraft-loop"):
+ num_str = name[len("imagecraft-loop"):].split("p")[0]
+ if num_str.isdigit():
+ used.add(int(num_str))
+ n = 0
+ while n in used:
+ n += 1
+ return n
+
+
+def convert_container_path_to_host_path(
+ project: str, container: str, container_path: str
+) -> pathlib.Path:
+ """Translate *container_path* to an absolute path on the host.
+
+ Queries the LXD API for the container's disk devices, finds the one with
+ the longest ``path`` prefix that matches *container_path* and has a
+ ``source``, then substitutes that source for the prefix.
+
+ Raises ``ValueError`` if no sourced device covers the given path.
+ """
+ data = lxd_get(f"/1.0/instances/{container}?project={project}")
+ devices = data["metadata"]["expanded_devices"]
+
+ # Collect disk devices that bind-mount a host source path.
+ sourced = [
+ (dev["path"].rstrip("/"), dev["source"])
+ for dev in devices.values()
+ if dev.get("type") == "disk" and "source" in dev
+ ]
+
+ # Normalise the container path and find the longest matching mount prefix.
+ norm = os.path.normpath(container_path)
+ best_mount = best_source = None
+ for mount, source in sourced:
+ if norm == mount or norm.startswith(mount + "/"):
+ if best_mount is None or len(mount) > len(best_mount):
+ best_mount = mount
+ best_source = source
+
+ if best_mount is None:
+ raise ValueError(
+ f"{container_path!r} is not under any sourced device in {container!r}"
+ )
+
+ relative = norm[len(best_mount):] # may be "" if exact match
+ return pathlib.Path(best_source + relative)
diff --git a/imagecraft/losetup/server/_server.py b/imagecraft/losetup/server/_server.py
new file mode 100644
index 00000000..cf52cf53
--- /dev/null
+++ b/imagecraft/losetup/server/_server.py
@@ -0,0 +1,201 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""losetup server: accept one connection, handle a loopback device request, then exit.
+
+Socket activation is used (systemd passes a pre-bound socket via LISTEN_FDS).
+"""
+
+import http.server
+import json
+import os
+import pathlib
+import socket
+import subprocess
+import urllib.parse
+
+from imagecraft.losetup.server._cgroup import parse_lxd_location, peer_cgroup
+from imagecraft.losetup.server._lxd import (
+ convert_container_path_to_host_path,
+ find_free_loop_slot,
+ lxd_get,
+ lxd_patch,
+)
+
+_SD_LISTEN_FDS_START = 3
+
+
+def _get_listening_socket() -> socket.socket:
+ """Return the listening socket passed by systemd socket activation."""
+ listen_pid = int(os.environ.get("LISTEN_PID", 0))
+ listen_fds = int(os.environ.get("LISTEN_FDS", 0))
+ if listen_pid != os.getpid() or listen_fds < 1:
+ raise RuntimeError(
+ f"expected socket activation (LISTEN_PID={listen_pid}, "
+ f"LISTEN_FDS={listen_fds}, pid={os.getpid()})"
+ )
+ return socket.fromfd(_SD_LISTEN_FDS_START, socket.AF_UNIX, socket.SOCK_STREAM)
+
+
+def _handle_attach(project: str, container: str, container_path: str) -> list[str]:
+ host_path = convert_container_path_to_host_path(project, container, container_path)
+
+ # Attach the image file as a loop device.
+ loop_dev = subprocess.run(
+ ["losetup", "--find", "--show", "--partscan", str(host_path)],
+ capture_output=True,
+ text=True,
+ check=True,
+ ).stdout.strip()
+
+ # Discover partitions via lsblk.
+ lsblk_data = json.loads(
+ subprocess.run(
+ ["lsblk", "--json", "--output", "NAME,TYPE", loop_dev],
+ capture_output=True,
+ text=True,
+ check=True,
+ ).stdout
+ )
+ loop_name = pathlib.Path(loop_dev).name # e.g. "loop133"
+ partitions = [
+ child["name"]
+ for child in lsblk_data["blockdevices"][0].get("children", [])
+ if child.get("type") == "part"
+ ]
+
+ # Find a free in-container imagecraft-loopN slot.
+ instance_data = lxd_get(f"/1.0/instances/{container}?project={project}")
+ current_devices = instance_data["metadata"]["expanded_devices"]
+ slot = find_free_loop_slot(current_devices)
+ loop_alias = f"imagecraft-loop{slot}" # e.g. "imagecraft-loop0"
+
+ # Build the new device entries to add to the container.
+ new_devices: dict[str, dict] = {
+ loop_alias: {
+ "type": "unix-block",
+ "path": f"/dev/{loop_alias}",
+ "source": loop_dev,
+ }
+ }
+ for part_name in partitions:
+ part_suffix = part_name[len(loop_name):] # e.g. "p1"
+ dev_alias = f"{loop_alias}{part_suffix}"
+ new_devices[dev_alias] = {
+ "type": "unix-block",
+ "path": f"/dev/{dev_alias}",
+ "source": f"/dev/{part_name}",
+ }
+
+ lxd_patch(f"/1.0/instances/{container}?project={project}", {"devices": new_devices})
+
+ return [dev["path"] for dev in new_devices.values()]
+
+
+def _handle_detach(project: str, container: str, container_dev_path: str) -> list[str]:
+ # Find the LXD device name matching this in-container path.
+ instance_data = lxd_get(f"/1.0/instances/{container}?project={project}")
+ current_devices = instance_data["metadata"]["expanded_devices"]
+
+ loop_dev = next(
+ (
+ dev["source"]
+ for dev in current_devices.values()
+ if dev.get("type") == "unix-block" and dev.get("path") == container_dev_path
+ ),
+ None,
+ )
+ if loop_dev is None:
+ raise ValueError(
+ f"no unix-block device with path {container_dev_path!r} in {container!r}"
+ )
+
+ # Remove all container devices backed by this loop device or its partitions.
+ devices_to_remove = {
+ name: None
+ for name, dev in current_devices.items()
+ if dev.get("type") == "unix-block"
+ and (
+ dev.get("source") == loop_dev
+ or dev.get("source", "").startswith(loop_dev + "p")
+ )
+ }
+ removed_paths = [current_devices[name]["path"] for name in devices_to_remove]
+ lxd_patch(
+ f"/1.0/instances/{container}?project={project}",
+ {"devices": devices_to_remove},
+ )
+
+ subprocess.run(["losetup", "--detach", loop_dev], check=True)
+
+ return removed_paths
+
+
+class _RequestHandler(http.server.BaseHTTPRequestHandler):
+ """HTTP request handler for the loopserver REST API."""
+
+ def __init__(self, project: str, container: str, *args, **kwargs) -> None:
+ self._project = project
+ self._container = container
+ super().__init__(*args, **kwargs)
+
+ def do_POST(self) -> None: # noqa: N802
+ parsed = urllib.parse.urlparse(self.path)
+ params = urllib.parse.parse_qs(parsed.query)
+ path_values = params.get("path", [])
+ if not path_values:
+ self._send_error(400, "missing 'path' query parameter")
+ return
+ path = path_values[0]
+
+ try:
+ if parsed.path == "/1.0/attach":
+ devices = _handle_attach(self._project, self._container, path)
+ elif parsed.path == "/1.0/detach":
+ devices = _handle_detach(self._project, self._container, path)
+ else:
+ self._send_error(404, f"unknown endpoint {parsed.path!r}")
+ return
+ except Exception as exc: # noqa: BLE001
+ self._send_error(500, str(exc))
+ return
+
+ self._send_json(200, {"status": "Success", "status_code": 200, "metadata": devices})
+
+ def _send_json(self, code: int, body: dict) -> None:
+ data = json.dumps(body).encode()
+ self.send_response(code)
+ self.send_header("Content-Type", "application/json")
+ self.send_header("Content-Length", str(len(data)))
+ self.end_headers()
+ self.wfile.write(data)
+
+ def _send_error(self, code: int, message: str) -> None:
+ self._send_json(code, {"error": message, "error_code": code})
+
+ def log_message(self, format: str, *args: object) -> None: # noqa: A002
+ pass # suppress default stderr logging
+
+
+def main() -> None:
+ """Entry point for the losetup server daemon."""
+ sock = _get_listening_socket()
+ conn, addr = sock.accept()
+ with conn:
+ location = parse_lxd_location(peer_cgroup(conn))
+ if location is None or location[0] != "imagecraft":
+ return
+ project, container = location
+ _RequestHandler(project, container, conn, addr, None)
+ sock.close()
diff --git a/imagecraft/plugins/mmdebstrap_plugin.py b/imagecraft/plugins/mmdebstrap_plugin.py
index d70159ba..2caf5521 100644
--- a/imagecraft/plugins/mmdebstrap_plugin.py
+++ b/imagecraft/plugins/mmdebstrap_plugin.py
@@ -88,7 +88,7 @@ def get_build_commands(self) -> list[str]:
cmd: list[str] = [
"mmdebstrap",
f"--arch={self._part_info.target_arch}",
- "--mode=root",
+ "--mode=fakeroot",
f"--variant={options.mmdebstrap_variant}",
"--format=dir",
]
diff --git a/imagecraft/services/image.py b/imagecraft/services/image.py
index 18bf48f7..f0781304 100644
--- a/imagecraft/services/image.py
+++ b/imagecraft/services/image.py
@@ -16,23 +16,19 @@
import atexit
import contextlib
-import json
import pathlib
import shutil
-import subprocess
import time
from collections.abc import Mapping
-from typing import Any, cast
+from typing import cast
from craft_application import AppMetadata, AppService, ServiceFactory
from craft_cli import CraftError, emit
+from imagecraft import losetup
from imagecraft.models import Project
from imagecraft.models.volume import PartitionSchema
from imagecraft.pack import gptutil
-from imagecraft.subprocesses import run
-
-_LOSETUP_BIN = "losetup"
class ImageService(AppService):
@@ -94,20 +90,11 @@ def create_images(self) -> Mapping[str, pathlib.Path]:
return self._images
- def _get_all_loop_devices(self) -> list[dict[str, Any]]:
- """Return a list of all loop devices on the system."""
- try:
- result = run(_LOSETUP_BIN, "--json")
- return cast(list[dict[str, Any]], json.loads(result.stdout)["loopdevices"])
- except (subprocess.CalledProcessError, KeyError, json.JSONDecodeError):
- return []
-
def attach_images(self) -> Mapping[str, str]:
"""Attach all created images as loop devices.
This method is idempotent. It will reuse existing loop devices if they
- are already attached to the correct files, and clean up stale devices
- pointing to deleted inodes.
+ are already attached to the correct files.
"""
if self._loop_devices:
return self._loop_devices
@@ -115,47 +102,16 @@ def attach_images(self) -> Mapping[str, str]:
if self._images is None:
raise ValueError("Images must be created before attaching.")
- all_devices = self._get_all_loop_devices()
-
for name, image_path in self._images.items():
- attached_device: str | None = None
-
- # 1. Check for existing devices pointing to this file.
- for dev in all_devices:
- back_file = pathlib.Path(dev["back-file"])
- try:
- if image_path.samefile(back_file):
- attached_device = dev["name"]
- emit.debug(
- f"Reusing existing loop device {attached_device} for {image_path}"
- )
- break
- except FileNotFoundError:
- # Stale inode: file deleted and recreated.
- if back_file == image_path:
- emit.debug(
- f"Detaching stale loop device {dev['name']} for {image_path}"
- )
- run(_LOSETUP_BIN, "-d", dev["name"])
-
- # 2. Attach a fresh device if none was found/reused.
- if not attached_device:
- try:
- attached_device = run(
- _LOSETUP_BIN,
- "--find",
- "--show",
- "--partscan",
- str(image_path),
- ).stdout.strip()
- emit.debug(f"Attached {image_path} as {attached_device}")
- except subprocess.CalledProcessError as err:
- raise CraftError(
- f"Failed to attach loop device for {image_path}.",
- details=str(err),
- resolution="Ensure loop devices are available and you have sufficient permissions (sudo).",
- ) from err
-
+ try:
+ attached_device = losetup.attach(image_path)[0]
+ emit.debug(f"Attached {image_path} as {attached_device}")
+ except Exception as err:
+ raise CraftError(
+ f"Failed to attach loop device for {image_path}.",
+ details=str(err),
+ resolution="Ensure loop devices are available and you have sufficient permissions (sudo).",
+ ) from err
self._loop_devices[name] = attached_device
if not self._atexit_registered:
@@ -174,7 +130,7 @@ def detach_images(self) -> None:
start_time = time.monotonic()
while time.monotonic() - start_time < 10: # noqa: PLR2004 (10 seconds)
try:
- run(_LOSETUP_BIN, "-d", device)
+ losetup.detach(device)
success = True
break
except Exception: # noqa: BLE001
diff --git a/imagecraft/services/provider.py b/imagecraft/services/provider.py
index 083fe891..fcbd0ae1 100644
--- a/imagecraft/services/provider.py
+++ b/imagecraft/services/provider.py
@@ -14,14 +14,69 @@
"""Imagecraft provider service."""
+import contextlib
+import os
+import pathlib
+from collections.abc import Generator
+from typing import Any
+
+import craft_platforms
import craft_providers
from craft_application.services.provider import ProviderService
from craft_cli import CraftError, emit
+from craft_providers.lxd import LXDInstance, LXDProvider
class Provider(ProviderService):
"""Imagecraft-specific project service."""
+ @contextlib.contextmanager
+ def instance(
+ self,
+ build_info: craft_platforms.BuildInfo,
+ *,
+ work_dir: pathlib.Path,
+ **kwargs: Any,
+ ) -> Generator[craft_providers.Executor, None, None]:
+ """Context manager for a provider instance.
+
+ When using the LXD provider, also mounts $SNAP_DATA/losetup into
+ /dev/losetup inside the instance so the losetup server socket is
+ accessible.
+ """
+ with super().instance(build_info, work_dir=work_dir, **kwargs) as instance:
+ if isinstance(self.get_provider(), LXDProvider) and isinstance(
+ instance, LXDInstance
+ ):
+ snap_data = os.environ.get("SNAP_DATA", "")
+ if snap_data:
+ losetup_host = pathlib.Path(snap_data) / "losetup"
+ losetup_host.mkdir(parents=True, exist_ok=True)
+ self._mount_with_shift(instance, losetup_host)
+ emit.debug(f"Mounted {losetup_host} -> /dev/losetup in instance")
+ else:
+ emit.debug("SNAP_DATA not set; skipping losetup mount")
+ yield instance
+
+ @staticmethod
+ def _mount_with_shift(instance: LXDInstance, host_source: pathlib.Path) -> None:
+ """Mount host_source into /dev/losetup in the instance with UID/GID shifting.
+
+ Uses pylxd directly to set shift=true, which remaps host UIDs/GIDs into
+ the container's namespace so the directory appears owned by root inside.
+ """
+ device_name = "disk-dev-losetup"
+ target = "/dev/losetup"
+ lxd_inst = instance._client.instances.get(instance.instance_name)
+ if device_name not in lxd_inst.devices:
+ lxd_inst.devices[device_name] = {
+ "type": "disk",
+ "source": str(host_source),
+ "path": target,
+ "shift": "true",
+ }
+ lxd_inst.save(wait=True)
+
def get_provider(self, name: str | None = None) -> craft_providers.Provider:
"""Get the provider to use. This method is a workaround for #253.
diff --git a/snap/hooks/configure b/snap/hooks/configure
index 69099ddf..5562b42e 100644
--- a/snap/hooks/configure
+++ b/snap/hooks/configure
@@ -2,6 +2,8 @@
"""Hook to configure imagecraft snap."""
+import grp
+import os
import sys
import snaphelpers
@@ -9,6 +11,19 @@ from craft_application.errors import CraftValidationError
from craft_application.util import SnapConfig
+def setup_losetup_dir() -> None:
+ """Create $SNAP_DATA/losetup and set lxd group ownership if available."""
+ losetup_dir = os.path.join(os.environ["SNAP_DATA"], "losetup")
+ os.makedirs(losetup_dir, exist_ok=True)
+
+ try:
+ lxd_gid = grp.getgrnam("lxd").gr_gid
+ except KeyError:
+ return
+
+ os.chown(losetup_dir, -1, lxd_gid)
+
+
def validate_snap_config() -> None:
"""Validate snap configuration."""
snap_config = snaphelpers.SnapConfigOptions(keys=["provider"])
@@ -23,4 +38,5 @@ def validate_snap_config() -> None:
if __name__ == "__main__":
+ setup_losetup_dir()
validate_snap_config()
diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml
index ff50f1a3..9ae5e865 100644
--- a/snap/snapcraft.yaml
+++ b/snap/snapcraft.yaml
@@ -26,6 +26,14 @@ apps:
PATH: "$SNAP/libexec/imagecraft:/snap/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
PERL5LIB: $SNAP/usr/share/perl5:$SNAP/usr/share/perl:$PERL5LIB
completer: completion.sh
+ losetup-server:
+ command: bin/python3 -m imagecraft.losetup.server
+ daemon: simple
+ daemon-scope: system
+ sockets:
+ losetup-socket:
+ listen-stream: $SNAP_DATA/losetup/sock
+ socket-mode: 0660
build-packages:
- git
diff --git a/spread.yaml b/spread.yaml
index 7d8c503c..d9be9d55 100644
--- a/spread.yaml
+++ b/spread.yaml
@@ -7,7 +7,7 @@ environment:
LANGUAGE: "en"
# build the snap with lxd
PROJECT_PATH: /home/imagecraft
- CRAFT_BUILD_ENVIRONMENT: host # Build in destructive mode until we can solve https://github.com/canonical/imagecraft/issues/253
+ #CRAFT_BUILD_ENVIRONMENT: host # Build in destructive mode until we can solve https://github.com/canonical/imagecraft/issues/253
SNAPD_TESTING_TOOLS: $PROJECT_PATH/tools/external/tools
PATH: /snap/bin:$PATH:$SNAPD_TESTING_TOOLS
diff --git a/tests/integration/losetup/__init__.py b/tests/integration/losetup/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/tests/integration/losetup/server/__init__.py b/tests/integration/losetup/server/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/tests/integration/losetup/server/test_cgroup.py b/tests/integration/losetup/server/test_cgroup.py
new file mode 100644
index 00000000..d2bab84e
--- /dev/null
+++ b/tests/integration/losetup/server/test_cgroup.py
@@ -0,0 +1,83 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Integration tests for imagecraft/losetup/server/_cgroup.py.
+
+peer_cgroup() reads SO_PEERCRED from a real socket and opens /proc//cgroup.
+No root required (both ends of the socket pair are in the same process).
+"""
+
+import os
+import socket
+
+import pytest
+
+from imagecraft.losetup.server._cgroup import parse_lxd_location, peer_cgroup
+
+
+class TestPeerCgroupIntegration:
+ def test_returns_string_for_current_process(self):
+ """peer_cgroup reads /proc//cgroup via SO_PEERCRED."""
+ client, server = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+ try:
+ result = peer_cgroup(server)
+ finally:
+ client.close()
+ server.close()
+
+ assert isinstance(result, str)
+ assert len(result) > 0
+
+ def test_matches_proc_cgroup_of_current_process(self):
+ """The cgroup string must match /proc//cgroup."""
+ pid = os.getpid()
+ with open(f"/proc/{pid}/cgroup") as f:
+ expected = f.read()
+
+ client, server = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+ try:
+ result = peer_cgroup(server)
+ finally:
+ client.close()
+ server.close()
+
+ assert result == expected
+
+ def test_cgroup_contains_0_line(self):
+ """The cgroup v2 unified hierarchy starts with '0::'."""
+ client, server = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+ try:
+ cgroup = peer_cgroup(server)
+ finally:
+ client.close()
+ server.close()
+
+ lines = cgroup.splitlines()
+ # At least one line (cgroup v2 pure or hybrid)
+ assert len(lines) >= 1
+
+ def test_parse_lxd_location_does_not_crash_on_real_cgroup(self):
+ """parse_lxd_location should not raise for any real cgroup string."""
+ client, server = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+ try:
+ cgroup = peer_cgroup(server)
+ finally:
+ client.close()
+ server.close()
+
+ # Should return None (we're not in an LXD container in tests) or a tuple
+ result = parse_lxd_location(cgroup)
+ assert result is None or (
+ isinstance(result, tuple) and len(result) == 2
+ )
diff --git a/tests/integration/losetup/server/test_server.py b/tests/integration/losetup/server/test_server.py
new file mode 100644
index 00000000..f2a01e5e
--- /dev/null
+++ b/tests/integration/losetup/server/test_server.py
@@ -0,0 +1,231 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Integration tests for imagecraft/losetup/server/_server.py.
+
+Full round-trip: real socket pair, real HTTP request/response cycle,
+with mocked LXD API and losetup calls so no LXD daemon is required.
+Tests that need real losetup are marked requires_root.
+"""
+
+import json
+import os
+import socket
+import threading
+
+import pytest
+
+from imagecraft.losetup.server._server import (
+ _RequestHandler,
+ _handle_attach,
+ _handle_detach,
+ main,
+)
+
+
+# ─── helpers ──────────────────────────────────────────────────────────────────
+
+
+def _make_request(method: str, path: str) -> bytes:
+ return (
+ f"{method} {path} HTTP/1.0\r\n"
+ f"Host: localhost\r\n"
+ f"\r\n"
+ ).encode()
+
+
+def _send_and_recv(request_bytes: bytes, project: str, container: str) -> tuple[int, dict]:
+ """Send *request_bytes* to a _RequestHandler and return (status, body)."""
+ client, server = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+ try:
+ client.sendall(request_bytes)
+ _RequestHandler(project, container, server, ("", 0), None)
+ finally:
+ server.close()
+
+ raw = b""
+ client.settimeout(3.0)
+ try:
+ while True:
+ chunk = client.recv(4096)
+ if not chunk:
+ break
+ raw += chunk
+ except socket.timeout:
+ pass
+ finally:
+ client.close()
+
+ header_end = raw.find(b"\r\n\r\n")
+ header_section = raw[:header_end].decode()
+ body_bytes = raw[header_end + 4:]
+ status_code = int(header_section.split("\r\n")[0].split()[1])
+ return status_code, json.loads(body_bytes)
+
+
+# ─── full HTTP round-trip (mocked LXD + subprocess) ──────────────────────────
+
+
+class TestServerRoundTrip:
+ def test_attach_full_cycle(self, mocker):
+ mocker.patch(
+ "imagecraft.losetup.server._server._handle_attach",
+ return_value=["/dev/imagecraft-loop0", "/dev/imagecraft-loop0p1"],
+ )
+
+ req = _make_request("POST", "/1.0/attach?path=/root/project/disk.img")
+ code, body = _send_and_recv(req, "imagecraft", "my-container")
+
+ assert code == 200
+ assert body["status_code"] == 200
+ assert "/dev/imagecraft-loop0" in body["metadata"]
+ assert "/dev/imagecraft-loop0p1" in body["metadata"]
+
+ def test_detach_full_cycle(self, mocker):
+ mocker.patch(
+ "imagecraft.losetup.server._server._handle_detach",
+ return_value=["/dev/imagecraft-loop0", "/dev/imagecraft-loop0p1"],
+ )
+
+ req = _make_request("POST", "/1.0/detach?path=/dev/imagecraft-loop0")
+ code, body = _send_and_recv(req, "imagecraft", "my-container")
+
+ assert code == 200
+ assert "/dev/imagecraft-loop0" in body["metadata"]
+
+ def test_missing_path_param(self):
+ req = _make_request("POST", "/1.0/attach")
+ code, body = _send_and_recv(req, "imagecraft", "my-container")
+
+ assert code == 400
+ assert body["error_code"] == 400
+
+ def test_unknown_endpoint(self):
+ req = _make_request("POST", "/1.0/badendpoint?path=/foo")
+ code, body = _send_and_recv(req, "imagecraft", "my-container")
+
+ assert code == 404
+
+ def test_handler_exception_returns_500(self, mocker):
+ mocker.patch(
+ "imagecraft.losetup.server._server._handle_attach",
+ side_effect=ValueError("boom"),
+ )
+
+ req = _make_request("POST", "/1.0/attach?path=/root/disk.img")
+ code, body = _send_and_recv(req, "imagecraft", "my-container")
+
+ assert code == 500
+ assert "boom" in body["error"]
+
+
+# ─── main() – non-imagecraft connection is silently rejected ──────────────────
+
+
+class TestMainRejectsNonImagecraft:
+ def test_non_imagecraft_project_closes_without_dispatch(self, mocker, tmp_path):
+ """main() reads cgroup, finds project != 'imagecraft', returns immediately."""
+ sock_path = tmp_path / "server.sock"
+
+ # Create a listening socket ourselves
+ listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ listener.bind(str(sock_path))
+ listener.listen(1)
+
+ mocker.patch(
+ "imagecraft.losetup.server._server._get_listening_socket",
+ return_value=listener,
+ )
+ # Peer cgroup claims a non-imagecraft project
+ mocker.patch(
+ "imagecraft.losetup.server._server.peer_cgroup",
+ return_value="0::/lxc.payload.snapcraft_some-container\n",
+ )
+ mock_handler = mocker.patch(
+ "imagecraft.losetup.server._server._RequestHandler"
+ )
+
+ client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ client.connect(str(sock_path))
+
+ main()
+
+ client.close()
+ listener.close()
+
+ # Handler should NOT have been called
+ mock_handler.assert_not_called()
+
+ def test_imagecraft_project_dispatches_to_handler(self, mocker, tmp_path):
+ """main() dispatches to _RequestHandler when project == 'imagecraft'."""
+ sock_path = tmp_path / "server.sock"
+
+ listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ listener.bind(str(sock_path))
+ listener.listen(1)
+
+ mocker.patch(
+ "imagecraft.losetup.server._server._get_listening_socket",
+ return_value=listener,
+ )
+ mocker.patch(
+ "imagecraft.losetup.server._server.peer_cgroup",
+ return_value="0::/lxc.payload.imagecraft_my-container\n",
+ )
+ mock_handler = mocker.patch(
+ "imagecraft.losetup.server._server._RequestHandler"
+ )
+
+ client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ client.connect(str(sock_path))
+
+ main()
+
+ client.close()
+ listener.close()
+
+ mock_handler.assert_called_once()
+ call_args = mock_handler.call_args[0]
+ assert call_args[0] == "imagecraft"
+ assert call_args[1] == "my-container"
+
+ def test_none_location_closes_without_dispatch(self, mocker, tmp_path):
+ """main() returns immediately when parse_lxd_location returns None."""
+ sock_path = tmp_path / "server.sock"
+
+ listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ listener.bind(str(sock_path))
+ listener.listen(1)
+
+ mocker.patch(
+ "imagecraft.losetup.server._server._get_listening_socket",
+ return_value=listener,
+ )
+ mocker.patch(
+ "imagecraft.losetup.server._server.peer_cgroup",
+ return_value="0::/\n", # not in container
+ )
+ mock_handler = mocker.patch(
+ "imagecraft.losetup.server._server._RequestHandler"
+ )
+
+ client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ client.connect(str(sock_path))
+
+ main()
+
+ client.close()
+ listener.close()
+
+ mock_handler.assert_not_called()
diff --git a/tests/integration/losetup/test_client.py b/tests/integration/losetup/test_client.py
new file mode 100644
index 00000000..5d027b31
--- /dev/null
+++ b/tests/integration/losetup/test_client.py
@@ -0,0 +1,79 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Integration tests for imagecraft/losetup/__init__.py.
+
+These tests call the real losetup binary and therefore require root.
+"""
+
+import pathlib
+
+import pytest
+
+from imagecraft.losetup import _losetup_attach, _losetup_detach
+
+
+@pytest.mark.requires_root
+class TestLosetupAttachDetachIntegration:
+ def test_attach_returns_loop_device(self, tmp_path):
+ img = tmp_path / "disk.img"
+ # Create a 10 MiB sparse file
+ img.write_bytes(b"\0" * (10 * 1024 * 1024))
+
+ devices = _losetup_attach(img)
+
+ try:
+ assert len(devices) >= 1
+ loop_dev = devices[0]
+ assert loop_dev.startswith("/dev/loop")
+ assert pathlib.Path(loop_dev).exists()
+ finally:
+ import subprocess
+
+ subprocess.run(["losetup", "--detach", devices[0]], check=False)
+
+ def test_detach_removes_device(self, tmp_path):
+ img = tmp_path / "disk.img"
+ img.write_bytes(b"\0" * (10 * 1024 * 1024))
+
+ import subprocess
+
+ loop_dev = (
+ subprocess.run(
+ ["losetup", "--find", "--show", str(img)],
+ capture_output=True,
+ text=True,
+ check=True,
+ ).stdout.strip()
+ )
+
+ try:
+ removed = _losetup_detach(loop_dev)
+ assert loop_dev in removed
+ assert not pathlib.Path(loop_dev).exists()
+ except Exception:
+ subprocess.run(["losetup", "--detach", loop_dev], check=False)
+ raise
+
+ def test_attach_and_detach_roundtrip(self, tmp_path):
+ img = tmp_path / "disk.img"
+ img.write_bytes(b"\0" * (10 * 1024 * 1024))
+
+ devices = _losetup_attach(img)
+ loop_dev = devices[0]
+ assert pathlib.Path(loop_dev).exists()
+
+ removed = _losetup_detach(loop_dev)
+ assert loop_dev in removed
+ assert not pathlib.Path(loop_dev).exists()
diff --git a/tests/integration/services/test_provider.py b/tests/integration/services/test_provider.py
new file mode 100644
index 00000000..5ca25fb4
--- /dev/null
+++ b/tests/integration/services/test_provider.py
@@ -0,0 +1,329 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Integration tests for Provider.instance with a live losetup server thread."""
+
+import contextlib
+import http.client
+import json
+import os
+import pathlib
+import socket
+import threading
+import urllib.parse
+from collections.abc import Generator
+from unittest.mock import MagicMock, patch
+
+import pytest
+from craft_application import AppMetadata, ServiceFactory
+from craft_application.services.provider import ProviderService
+from craft_providers.lxd import LXDInstance, LXDProvider
+
+from imagecraft.losetup.server._server import _RequestHandler
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def _make_server_socket(path: pathlib.Path) -> socket.socket:
+ """Create and bind a listening Unix socket at *path*."""
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.bind(str(path))
+ os.chmod(path, 0o666)
+ sock.listen(1)
+ return sock
+
+
+class _LoopServerThread(threading.Thread):
+ """Accept connections and dispatch to _RequestHandler in a background thread."""
+
+ def __init__(self, sock: socket.socket, project: str, container: str) -> None:
+ super().__init__(daemon=True)
+ self._sock = sock
+ self._project = project
+ self._container = container
+ self._stop_event = threading.Event()
+ self._sock.settimeout(0.5)
+
+ def run(self) -> None:
+ while not self._stop_event.is_set():
+ try:
+ conn, addr = self._sock.accept()
+ except TimeoutError:
+ continue
+ try:
+ _RequestHandler(self._project, self._container, conn, addr, None)
+ finally:
+ conn.close()
+
+ def stop(self) -> None:
+ self._stop_event.set()
+ self.join(timeout=3)
+
+
+def _http_post(sock_path: pathlib.Path, endpoint: str, path: str) -> dict:
+ """Send a POST request to the loop server and return the parsed JSON."""
+ query = urllib.parse.urlencode({"path": path})
+ conn = http.client.HTTPConnection("loopserver")
+ conn.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ conn.sock.connect(str(sock_path))
+ conn.request("POST", f"{endpoint}?{query}")
+ return json.loads(conn.getresponse().read())
+
+
+def _make_provider_service(
+ mock_provider: object,
+ mock_services: MagicMock,
+ mock_app: MagicMock,
+) -> "imagecraft.services.provider.Provider":
+ """Construct a Provider service with the given provider pre-cached."""
+ from imagecraft.services.provider import Provider
+
+ svc = Provider(mock_app, mock_services, work_dir=pathlib.Path("/tmp"))
+ svc._provider = mock_provider
+ return svc
+
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+
+@pytest.fixture
+def losetup_socket_dir(tmp_path: pathlib.Path) -> pathlib.Path:
+ d = tmp_path / "snap_data" / "losetup"
+ d.mkdir(parents=True)
+ return d
+
+
+@pytest.fixture
+def loop_server(
+ losetup_socket_dir: pathlib.Path,
+) -> Generator[tuple[pathlib.Path, _LoopServerThread], None, None]:
+ """Start a _LoopServerThread on a temp socket. Yields (socket_path, thread)."""
+ sock_path = losetup_socket_dir / "sock"
+ srv_sock = _make_server_socket(sock_path)
+ thread = _LoopServerThread(srv_sock, project="imagecraft", container="test-container")
+ thread.start()
+ yield sock_path, thread
+ thread.stop()
+ srv_sock.close()
+
+
+@pytest.fixture
+def mock_app() -> MagicMock:
+ app = MagicMock(spec=AppMetadata)
+ app.name = "imagecraft"
+ return app
+
+
+@pytest.fixture
+def mock_services() -> MagicMock:
+ svc = MagicMock(spec=ServiceFactory)
+ project = MagicMock()
+ project.name = "test-project"
+ svc.get.return_value.get.return_value = project
+ return svc
+
+
+# ---------------------------------------------------------------------------
+# Tests: Provider.instance losetup mount behaviour
+# ---------------------------------------------------------------------------
+
+
+@pytest.fixture
+def mock_lxd_inst():
+ """A mock pylxd instance (returned by _client.instances.get())."""
+ inst = MagicMock()
+ inst.devices = {}
+ return inst
+
+
+@pytest.fixture
+def mock_lxd_provider_instance(mock_lxd_inst):
+ """A mock LXDInstance with a wired-up pylxd _client."""
+ m = MagicMock(spec=LXDInstance)
+ m.instance_name = "test-container"
+ m._client = MagicMock()
+ m._client.instances.get.return_value = mock_lxd_inst
+ return m
+
+
+@contextlib.contextmanager
+def _fake_super_instance(self, build_info, *, work_dir, **kwargs):
+ """Stub for ProviderService.instance that yields a fresh MagicMock."""
+ yield MagicMock()
+
+
+@pytest.mark.requires_root
+def test_provider_instance_mounts_losetup_for_lxd(
+ tmp_path: pathlib.Path,
+ monkeypatch: pytest.MonkeyPatch,
+ loop_server: tuple[pathlib.Path, _LoopServerThread],
+ losetup_socket_dir: pathlib.Path,
+ mock_app: MagicMock,
+ mock_services: MagicMock,
+ mock_lxd_inst: MagicMock,
+ mock_lxd_provider_instance: MagicMock,
+) -> None:
+ """Provider.instance mounts $SNAP_DATA/losetup when using the LXD provider."""
+ snap_data = losetup_socket_dir.parent
+ monkeypatch.setenv("SNAP_DATA", str(snap_data))
+
+ provider_svc = _make_provider_service(
+ MagicMock(spec=LXDProvider), mock_services, mock_app
+ )
+
+ @contextlib.contextmanager
+ def fake_super_with_lxd_instance(self, build_info, *, work_dir, **kwargs):
+ yield mock_lxd_provider_instance
+
+ with patch.object(ProviderService, "instance", fake_super_with_lxd_instance):
+ with provider_svc.instance(MagicMock(), work_dir=tmp_path / "work"):
+ pass
+
+ expected_host = snap_data / "losetup"
+ assert expected_host.exists()
+ assert "disk-dev-losetup" in mock_lxd_inst.devices
+ device = mock_lxd_inst.devices["disk-dev-losetup"]
+ assert device["type"] == "disk"
+ assert device["path"] == "/dev/losetup"
+ assert device["source"] == str(expected_host)
+ assert device["shift"] == "true"
+ mock_lxd_inst.save.assert_called_once_with(wait=True)
+
+
+def test_provider_instance_skips_losetup_when_no_snap_data(
+ tmp_path: pathlib.Path,
+ monkeypatch: pytest.MonkeyPatch,
+ mock_app: MagicMock,
+ mock_services: MagicMock,
+ mock_lxd_inst: MagicMock,
+ mock_lxd_provider_instance: MagicMock,
+) -> None:
+ """Provider.instance does NOT mount losetup when SNAP_DATA is unset."""
+ monkeypatch.delenv("SNAP_DATA", raising=False)
+
+ provider_svc = _make_provider_service(
+ MagicMock(spec=LXDProvider), mock_services, mock_app
+ )
+
+ @contextlib.contextmanager
+ def fake_super_with_lxd_instance(self, build_info, *, work_dir, **kwargs):
+ yield mock_lxd_provider_instance
+
+ with patch.object(ProviderService, "instance", fake_super_with_lxd_instance):
+ with provider_svc.instance(MagicMock(), work_dir=tmp_path / "work"):
+ pass
+
+ mock_lxd_inst.save.assert_not_called()
+
+
+def test_provider_instance_skips_losetup_for_non_lxd(
+ tmp_path: pathlib.Path,
+ monkeypatch: pytest.MonkeyPatch,
+ mock_app: MagicMock,
+ mock_services: MagicMock,
+ mock_lxd_inst: MagicMock,
+ mock_lxd_provider_instance: MagicMock,
+) -> None:
+ """Provider.instance does NOT mount losetup for non-LXD providers."""
+ from craft_providers.multipass import MultipassProvider
+
+ monkeypatch.setenv("SNAP_DATA", str(tmp_path / "snap_data"))
+
+ provider_svc = _make_provider_service(
+ MagicMock(spec=MultipassProvider), mock_services, mock_app
+ )
+
+ @contextlib.contextmanager
+ def fake_super_with_lxd_instance(self, build_info, *, work_dir, **kwargs):
+ yield mock_lxd_provider_instance
+
+ with patch.object(ProviderService, "instance", fake_super_with_lxd_instance):
+ with provider_svc.instance(MagicMock(), work_dir=tmp_path / "work"):
+ pass
+
+ mock_lxd_inst.save.assert_not_called()
+
+
+# ---------------------------------------------------------------------------
+# Test: full loopserver attach/detach round-trip via _LoopServerThread
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.requires_root
+def test_loopserver_attach_detach_round_trip(
+ tmp_path: pathlib.Path,
+ loop_server: tuple[pathlib.Path, _LoopServerThread],
+) -> None:
+ """Full HTTP round-trip: attach an image, verify devices added, then detach."""
+ import imagecraft.losetup.server._lxd as _lxd
+ from imagecraft.pack import gptutil
+ from unittest.mock import MagicMock
+
+ sock_path, _ = loop_server
+
+ # Create a minimal GPT image to attach.
+ image_path = tmp_path / "test.img"
+ gptutil.create_empty_gpt_image(
+ imagepath=image_path,
+ sector_size=512,
+ layout=MagicMock(structure=[], volume_schema=None),
+ )
+
+ # Fake LXD device state so we don't need a real daemon.
+ fake_devices: dict = {}
+
+ def fake_lxd_get(path: str) -> dict:
+ return {"metadata": {"expanded_devices": dict(fake_devices)}}
+
+ def fake_lxd_patch(path: str, body: dict) -> dict:
+ for name, dev in body.get("devices", {}).items():
+ if dev is None:
+ fake_devices.pop(name, None)
+ else:
+ fake_devices[name] = dev
+ return {}
+
+ with (
+ patch.object(_lxd, "lxd_get", side_effect=fake_lxd_get),
+ patch.object(_lxd, "lxd_patch", side_effect=fake_lxd_patch),
+ patch(
+ "imagecraft.losetup.server._server.convert_container_path_to_host_path",
+ return_value=image_path,
+ ),
+ ):
+ # Attach
+ resp = _http_post(sock_path, "/1.0/attach", "/root/test.img")
+ assert resp["status_code"] == 200, resp
+ attached = resp["metadata"]
+ assert attached, "expected at least the loop device in metadata"
+ loop_alias = attached[0]
+ assert loop_alias.startswith("/dev/imagecraft-loop")
+
+ # Loop device should appear in the fake device table.
+ assert any(dev.get("path") == loop_alias for dev in fake_devices.values())
+
+ # Detach
+ resp = _http_post(sock_path, "/1.0/detach", loop_alias)
+ assert resp["status_code"] == 200, resp
+ assert loop_alias in resp["metadata"]
+
+ # All unix-block devices should be removed.
+ assert not any(
+ dev.get("type") == "unix-block" for dev in fake_devices.values()
+ )
diff --git a/tests/unit/losetup/__init__.py b/tests/unit/losetup/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/tests/unit/losetup/server/__init__.py b/tests/unit/losetup/server/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/tests/unit/losetup/server/test_cgroup.py b/tests/unit/losetup/server/test_cgroup.py
new file mode 100644
index 00000000..bd72b254
--- /dev/null
+++ b/tests/unit/losetup/server/test_cgroup.py
@@ -0,0 +1,80 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Unit tests for imagecraft/losetup/server/_cgroup.py."""
+
+import pytest
+
+from imagecraft.losetup.server._cgroup import parse_lxd_location
+
+
+class TestParseLxdLocation:
+ @pytest.mark.parametrize(
+ "cgroup, expected",
+ [
+ # Happy path: project_container
+ (
+ "0::/lxc.payload.snapcraft_snapcraft-imagecraft-amd64-40774607"
+ "/user.slice/user@0.service/init.scope\n",
+ ("snapcraft", "snapcraft-imagecraft-amd64-40774607"),
+ ),
+ # Root cgroup – not in a container
+ ("0::/\n", None),
+ # User slice – no lxc.payload component
+ ("0::/user.slice/user@1000.service\n", None),
+ # Empty string
+ ("", None),
+ # No 0:: line at all
+ ("1::/some/path\n2::/other\n", None),
+ # Exact imagecraft project
+ (
+ "0::/lxc.payload.imagecraft_my-container\n",
+ ("imagecraft", "my-container"),
+ ),
+ ],
+ )
+ def test_parse_various_inputs(self, cgroup, expected):
+ assert parse_lxd_location(cgroup) == expected
+
+ def test_innermost_wins_with_nested_containers(self):
+ """Walk in reverse so the innermost lxc.payload wins."""
+ cgroup = (
+ "0::/lxc.payload.outer_cont1"
+ "/lxc.payload.inner_cont2"
+ "/some.scope\n"
+ )
+ assert parse_lxd_location(cgroup) == ("inner", "cont2")
+
+ def test_picks_0_double_colon_line_only(self):
+ """Lines that don't start with '0::' are ignored."""
+ cgroup = "0::/some/path\n1::/other\n"
+ # "some/path" has no lxc.payload component → None
+ assert parse_lxd_location(cgroup) is None
+
+ def test_multiple_lines_uses_0_line(self):
+ """When multiple cgroup lines are present, 0:: is the one parsed."""
+ cgroup = (
+ "1::/lxc.payload.sneaky_container\n"
+ "0::/lxc.payload.real_mybox\n"
+ )
+ assert parse_lxd_location(cgroup) == ("real", "mybox")
+
+ def test_no_underscore_separator_returns_none(self):
+ """If the component after lxc.payload has no underscore, skip it."""
+ cgroup = "0::/lxc.payload.nounderscorehere\n"
+ assert parse_lxd_location(cgroup) is None
+
+ def test_container_name_may_contain_hyphens(self):
+ cgroup = "0::/lxc.payload.myproject_my-long-container-name-01\n"
+ assert parse_lxd_location(cgroup) == ("myproject", "my-long-container-name-01")
diff --git a/tests/unit/losetup/server/test_lxd.py b/tests/unit/losetup/server/test_lxd.py
new file mode 100644
index 00000000..64d4b561
--- /dev/null
+++ b/tests/unit/losetup/server/test_lxd.py
@@ -0,0 +1,201 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Unit tests for imagecraft/losetup/server/_lxd.py."""
+
+import pathlib
+
+import pytest
+
+from imagecraft.losetup.server._lxd import (
+ convert_container_path_to_host_path,
+ find_free_loop_slot,
+)
+
+
+# ─── find_free_loop_slot ──────────────────────────────────────────────────────
+
+
+class TestFindFreeLoopSlot:
+ @pytest.mark.parametrize(
+ "devices, expected",
+ [
+ # Empty → slot 0
+ ({}, 0),
+ # Slot 0 taken (including a partition device) → slot 1
+ (
+ {
+ "imagecraft-loop0": {"type": "unix-block"},
+ "imagecraft-loop0p1": {"type": "unix-block"},
+ },
+ 1,
+ ),
+ # Gap at slot 1 → slot 1
+ (
+ {
+ "imagecraft-loop0": {"type": "unix-block"},
+ "imagecraft-loop2": {"type": "unix-block"},
+ },
+ 1,
+ ),
+ # Non-imagecraft devices are ignored
+ ({"other-device": {"type": "disk"}}, 0),
+ # Mixed: imagecraft slot 0, other devices → still slot 1
+ (
+ {
+ "imagecraft-loop0": {"type": "unix-block"},
+ "eth0": {"type": "nic"},
+ },
+ 1,
+ ),
+ # Two consecutive slots taken → slot 2
+ (
+ {
+ "imagecraft-loop0": {},
+ "imagecraft-loop1": {},
+ },
+ 2,
+ ),
+ ],
+ )
+ def test_find_free_slot(self, devices, expected):
+ assert find_free_loop_slot(devices) == expected
+
+
+# ─── convert_container_path_to_host_path ──────────────────────────────────────
+
+
+class TestConvertContainerPathToHostPath:
+ def _make_devices(self, *entries):
+ """Build an expanded_devices dict from (name, path, source) tuples."""
+ return {
+ name: {"type": "disk", "path": path, "source": source}
+ for name, path, source in entries
+ }
+
+ def _mock_lxd_get(self, mocker, devices: dict):
+ mocker.patch(
+ "imagecraft.losetup.server._lxd.lxd_get",
+ return_value={"metadata": {"expanded_devices": devices}},
+ )
+
+ def test_simple_prefix_match(self, mocker):
+ devices = self._make_devices(
+ ("disk-root", "/root/project", "/home/user/project")
+ )
+ self._mock_lxd_get(mocker, devices)
+
+ result = convert_container_path_to_host_path(
+ "myproject", "mycontainer", "/root/project/disk.img"
+ )
+
+ assert result == pathlib.Path("/home/user/project/disk.img")
+
+ def test_exact_path_match(self, mocker):
+ devices = self._make_devices(
+ ("disk-root", "/root/project", "/home/user/project")
+ )
+ self._mock_lxd_get(mocker, devices)
+
+ result = convert_container_path_to_host_path(
+ "myproject", "mycontainer", "/root/project"
+ )
+
+ assert result == pathlib.Path("/home/user/project")
+
+ def test_longest_prefix_wins(self, mocker):
+ """When two mounts overlap, the longer (more specific) one is chosen."""
+ devices = {
+ "disk-root": {"type": "disk", "path": "/root", "source": "/host/root"},
+ "disk-project": {
+ "type": "disk",
+ "path": "/root/project",
+ "source": "/home/user/project",
+ },
+ }
+ self._mock_lxd_get(mocker, devices)
+
+ result = convert_container_path_to_host_path(
+ "myproject", "mycontainer", "/root/project/foo"
+ )
+
+ assert result == pathlib.Path("/home/user/project/foo")
+
+ def test_raises_when_no_matching_device(self, mocker):
+ devices = self._make_devices(
+ ("disk-other", "/other/path", "/host/other")
+ )
+ self._mock_lxd_get(mocker, devices)
+
+ with pytest.raises(ValueError, match="not under any sourced device"):
+ convert_container_path_to_host_path(
+ "myproject", "mycontainer", "/root/project/disk.img"
+ )
+
+ def test_non_disk_device_is_ignored(self, mocker):
+ devices = {
+ "eth0": {"type": "nic", "path": "/root/project", "source": "/host/proj"},
+ "disk-root": {"type": "disk", "path": "/root", "source": "/host/root"},
+ }
+ self._mock_lxd_get(mocker, devices)
+
+ result = convert_container_path_to_host_path(
+ "myproject", "mycontainer", "/root/project/file"
+ )
+
+ # Uses /root mount, not the nic device
+ assert result == pathlib.Path("/host/root/project/file")
+
+ def test_disk_without_source_is_ignored(self, mocker):
+ devices = {
+ "disk-nosource": {"type": "disk", "path": "/root/project"},
+ "disk-root": {"type": "disk", "path": "/root", "source": "/host/root"},
+ }
+ self._mock_lxd_get(mocker, devices)
+
+ result = convert_container_path_to_host_path(
+ "myproject", "mycontainer", "/root/project/file"
+ )
+
+ # disk-nosource has no "source" so it's ignored; falls back to /root
+ assert result == pathlib.Path("/host/root/project/file")
+
+ def test_lxd_get_called_with_correct_path(self, mocker):
+ mock_get = mocker.patch(
+ "imagecraft.losetup.server._lxd.lxd_get",
+ return_value={
+ "metadata": {
+ "expanded_devices": {
+ "d": {"type": "disk", "path": "/", "source": "/host"}
+ }
+ }
+ },
+ )
+
+ convert_container_path_to_host_path("proj", "cont", "/file")
+
+ mock_get.assert_called_once_with("/1.0/instances/cont?project=proj")
+
+ def test_normpath_applied_to_container_path(self, mocker):
+ """Trailing slashes and dots are normalised before matching."""
+ devices = self._make_devices(
+ ("disk-root", "/root/project", "/host/project")
+ )
+ self._mock_lxd_get(mocker, devices)
+
+ result = convert_container_path_to_host_path(
+ "p", "c", "/root/project/"
+ )
+
+ assert result == pathlib.Path("/host/project")
diff --git a/tests/unit/losetup/server/test_server.py b/tests/unit/losetup/server/test_server.py
new file mode 100644
index 00000000..0da3e515
--- /dev/null
+++ b/tests/unit/losetup/server/test_server.py
@@ -0,0 +1,373 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Unit tests for imagecraft/losetup/server/_server.py."""
+
+import json
+import pathlib
+import socket
+from unittest.mock import MagicMock, call
+
+import pytest
+
+from imagecraft.losetup.server._server import (
+ _RequestHandler,
+ _handle_attach,
+ _handle_detach,
+)
+
+
+# ─── helpers ──────────────────────────────────────────────────────────────────
+
+
+def _make_http_request(
+ method: str,
+ path: str,
+ version: str = "HTTP/1.0",
+ extra_headers: str = "",
+) -> bytes:
+ return (
+ f"{method} {path} {version}\r\n"
+ f"Host: localhost\r\n"
+ f"{extra_headers}"
+ f"\r\n"
+ ).encode()
+
+
+def _run_handler(
+ project: str, container: str, request_bytes: bytes
+) -> tuple[int, dict]:
+ """
+ Feed *request_bytes* into a _RequestHandler via a real socketpair.
+ Returns (status_code, parsed_json_body).
+ """
+ client, server = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+ try:
+ client.sendall(request_bytes)
+ _RequestHandler(project, container, server, ("127.0.0.1", 0), None)
+ finally:
+ server.close()
+
+ raw = b""
+ client.settimeout(2.0)
+ try:
+ while True:
+ chunk = client.recv(4096)
+ if not chunk:
+ break
+ raw += chunk
+ except socket.timeout:
+ pass
+ finally:
+ client.close()
+
+ # Parse HTTP response: status line + headers + blank line + body
+ header_end = raw.find(b"\r\n\r\n")
+ header_section = raw[:header_end].decode()
+ body = raw[header_end + 4 :]
+ status_line = header_section.split("\r\n")[0]
+ status_code = int(status_line.split()[1])
+ return status_code, json.loads(body)
+
+
+# ─── _handle_attach ───────────────────────────────────────────────────────────
+
+
+class TestHandleAttach:
+ def _setup(
+ self,
+ mocker,
+ loop_dev="/dev/loop133",
+ partitions=("loop133p1", "loop133p2"),
+ existing_devices=None,
+ ):
+ mocker.patch(
+ "imagecraft.losetup.server._server.convert_container_path_to_host_path",
+ return_value=pathlib.Path("/host/disk.img"),
+ )
+
+ losetup_result = MagicMock()
+ losetup_result.stdout = loop_dev + "\n"
+ part_children = [{"name": p, "type": "part"} for p in partitions]
+ loop_name = pathlib.Path(loop_dev).name
+ lsblk_result = MagicMock()
+ lsblk_result.stdout = json.dumps(
+ {
+ "blockdevices": [
+ {"name": loop_name, "type": "loop", "children": part_children}
+ ]
+ }
+ )
+ mock_run = mocker.patch(
+ "imagecraft.losetup.server._server.subprocess.run",
+ side_effect=[losetup_result, lsblk_result],
+ )
+
+ current_devs = existing_devices or {}
+ mocker.patch(
+ "imagecraft.losetup.server._server.lxd_get",
+ return_value={"metadata": {"expanded_devices": current_devs}},
+ )
+ mock_patch = mocker.patch("imagecraft.losetup.server._server.lxd_patch")
+
+ return mock_run, mock_patch
+
+ def test_returns_device_paths(self, mocker):
+ self._setup(mocker)
+
+ result = _handle_attach("imagecraft", "my-container", "/root/disk.img")
+
+ assert "/dev/imagecraft-loop0" in result
+ assert "/dev/imagecraft-loop0p1" in result
+ assert "/dev/imagecraft-loop0p2" in result
+
+ def test_uses_slot_zero_when_no_existing_devices(self, mocker):
+ self._setup(mocker, existing_devices={})
+
+ result = _handle_attach("imagecraft", "my-container", "/root/disk.img")
+
+ assert all("loop0" in p for p in result)
+
+ def test_uses_next_free_slot_when_slot_zero_taken(self, mocker):
+ existing = {
+ "imagecraft-loop0": {"type": "unix-block", "path": "/dev/imagecraft-loop0"},
+ "imagecraft-loop0p1": {
+ "type": "unix-block",
+ "path": "/dev/imagecraft-loop0p1",
+ },
+ }
+ self._setup(mocker, existing_devices=existing)
+
+ result = _handle_attach("imagecraft", "my-container", "/root/disk.img")
+
+ assert any("loop1" in p for p in result)
+ assert not any("loop0p" in p for p in result) or any(
+ "loop1" in p for p in result
+ )
+
+ def test_patches_lxd_with_new_devices(self, mocker):
+ _, mock_patch = self._setup(mocker, partitions=["loop133p1"])
+
+ _handle_attach("imagecraft", "my-container", "/root/disk.img")
+
+ mock_patch.assert_called_once()
+ call_args = mock_patch.call_args
+ devices_arg = call_args[0][1]["devices"]
+ names = list(devices_arg.keys())
+ assert any("imagecraft-loop" in n for n in names)
+
+ def test_losetup_called_with_host_path(self, mocker):
+ mock_run, _ = self._setup(mocker, partitions=[])
+
+ _handle_attach("imagecraft", "my-container", "/root/disk.img")
+
+ losetup_cmd = mock_run.call_args_list[0][0][0]
+ assert losetup_cmd == [
+ "losetup",
+ "--find",
+ "--show",
+ "--partscan",
+ "/host/disk.img",
+ ]
+
+ def test_no_partitions(self, mocker):
+ self._setup(mocker, partitions=[])
+
+ result = _handle_attach("imagecraft", "my-container", "/root/disk.img")
+
+ assert result == ["/dev/imagecraft-loop0"]
+
+
+# ─── _handle_detach ───────────────────────────────────────────────────────────
+
+
+class TestHandleDetach:
+ def _make_devices(self, loop_alias="imagecraft-loop0", loop_dev="/dev/loop133"):
+ return {
+ loop_alias: {
+ "type": "unix-block",
+ "path": f"/dev/{loop_alias}",
+ "source": loop_dev,
+ },
+ f"{loop_alias}p1": {
+ "type": "unix-block",
+ "path": f"/dev/{loop_alias}p1",
+ "source": f"{loop_dev}p1",
+ },
+ f"{loop_alias}p2": {
+ "type": "unix-block",
+ "path": f"/dev/{loop_alias}p2",
+ "source": f"{loop_dev}p2",
+ },
+ }
+
+ def _setup(self, mocker, devices=None):
+ if devices is None:
+ devices = self._make_devices()
+ mocker.patch(
+ "imagecraft.losetup.server._server.lxd_get",
+ return_value={"metadata": {"expanded_devices": devices}},
+ )
+ mock_patch = mocker.patch("imagecraft.losetup.server._server.lxd_patch")
+ mock_run = mocker.patch("imagecraft.losetup.server._server.subprocess.run")
+ return mock_patch, mock_run
+
+ def test_returns_removed_paths(self, mocker):
+ mock_patch, _ = self._setup(mocker)
+
+ result = _handle_detach(
+ "imagecraft", "my-container", "/dev/imagecraft-loop0"
+ )
+
+ assert set(result) == {
+ "/dev/imagecraft-loop0",
+ "/dev/imagecraft-loop0p1",
+ "/dev/imagecraft-loop0p2",
+ }
+
+ def test_runs_losetup_detach_on_host_device(self, mocker):
+ _, mock_run = self._setup(mocker)
+
+ _handle_detach("imagecraft", "my-container", "/dev/imagecraft-loop0")
+
+ mock_run.assert_called_once_with(
+ ["losetup", "--detach", "/dev/loop133"], check=True
+ )
+
+ def test_patches_lxd_with_none_to_remove_devices(self, mocker):
+ mock_patch, _ = self._setup(mocker)
+
+ _handle_detach("imagecraft", "my-container", "/dev/imagecraft-loop0")
+
+ mock_patch.assert_called_once()
+ devices_arg = mock_patch.call_args[0][1]["devices"]
+ # All removed devices mapped to None
+ assert all(v is None for v in devices_arg.values())
+ assert "imagecraft-loop0" in devices_arg
+
+ def test_raises_when_path_not_found(self, mocker):
+ self._setup(mocker)
+
+ with pytest.raises(ValueError, match="no unix-block device with path"):
+ _handle_detach(
+ "imagecraft", "my-container", "/dev/imagecraft-loop99"
+ )
+
+ def test_removes_partition_devices_with_loop_prefix(self, mocker):
+ """Partition sources like /dev/loop133p1 are matched by prefix."""
+ mock_patch, _ = self._setup(mocker)
+
+ _handle_detach("imagecraft", "my-container", "/dev/imagecraft-loop0")
+
+ devices_removed = mock_patch.call_args[0][1]["devices"]
+ # Both the main device and its partitions should be removed
+ assert "imagecraft-loop0p1" in devices_removed
+ assert "imagecraft-loop0p2" in devices_removed
+
+ def test_does_not_remove_unrelated_devices(self, mocker):
+ devices = {
+ **self._make_devices("imagecraft-loop0", "/dev/loop133"),
+ "imagecraft-loop1": {
+ "type": "unix-block",
+ "path": "/dev/imagecraft-loop1",
+ "source": "/dev/loop134",
+ },
+ }
+ mock_patch, _ = self._setup(mocker, devices=devices)
+
+ _handle_detach("imagecraft", "my-container", "/dev/imagecraft-loop0")
+
+ devices_removed = mock_patch.call_args[0][1]["devices"]
+ assert "imagecraft-loop1" not in devices_removed
+
+
+# ─── _RequestHandler (via socket pair) ────────────────────────────────────────
+
+
+class TestRequestHandler:
+ def test_valid_attach_returns_200(self, mocker):
+ mocker.patch(
+ "imagecraft.losetup.server._server._handle_attach",
+ return_value=["/dev/imagecraft-loop0", "/dev/imagecraft-loop0p1"],
+ )
+
+ req = _make_http_request("POST", "/1.0/attach?path=/root/disk.img")
+ code, body = _run_handler("imagecraft", "my-container", req)
+
+ assert code == 200
+ assert body["metadata"] == ["/dev/imagecraft-loop0", "/dev/imagecraft-loop0p1"]
+ assert body["status_code"] == 200
+
+ def test_valid_detach_returns_200(self, mocker):
+ mocker.patch(
+ "imagecraft.losetup.server._server._handle_detach",
+ return_value=["/dev/imagecraft-loop0"],
+ )
+
+ req = _make_http_request("POST", "/1.0/detach?path=/dev/imagecraft-loop0")
+ code, body = _run_handler("imagecraft", "my-container", req)
+
+ assert code == 200
+ assert body["metadata"] == ["/dev/imagecraft-loop0"]
+
+ def test_missing_path_param_returns_400(self, mocker):
+ req = _make_http_request("POST", "/1.0/attach")
+ code, body = _run_handler("imagecraft", "my-container", req)
+
+ assert code == 400
+ assert "path" in body["error"]
+
+ def test_unknown_endpoint_returns_404(self, mocker):
+ req = _make_http_request("POST", "/1.0/unknown?path=/x")
+ code, body = _run_handler("imagecraft", "my-container", req)
+
+ assert code == 404
+
+ def test_handler_exception_returns_500(self, mocker):
+ mocker.patch(
+ "imagecraft.losetup.server._server._handle_attach",
+ side_effect=RuntimeError("unexpected failure"),
+ )
+
+ req = _make_http_request("POST", "/1.0/attach?path=/root/disk.img")
+ code, body = _run_handler("imagecraft", "my-container", req)
+
+ assert code == 500
+ assert "unexpected failure" in body["error"]
+
+ def test_attach_receives_correct_args(self, mocker):
+ mock_attach = mocker.patch(
+ "imagecraft.losetup.server._server._handle_attach",
+ return_value=[],
+ )
+
+ req = _make_http_request("POST", "/1.0/attach?path=/root/disk.img")
+ _run_handler("imagecraft", "my-container", req)
+
+ mock_attach.assert_called_once_with(
+ "imagecraft", "my-container", "/root/disk.img"
+ )
+
+ def test_detach_receives_correct_args(self, mocker):
+ mock_detach = mocker.patch(
+ "imagecraft.losetup.server._server._handle_detach",
+ return_value=[],
+ )
+
+ req = _make_http_request("POST", "/1.0/detach?path=/dev/imagecraft-loop0")
+ _run_handler("imagecraft", "my-container", req)
+
+ mock_detach.assert_called_once_with(
+ "imagecraft", "my-container", "/dev/imagecraft-loop0"
+ )
diff --git a/tests/unit/losetup/test_client.py b/tests/unit/losetup/test_client.py
new file mode 100644
index 00000000..4e3f4980
--- /dev/null
+++ b/tests/unit/losetup/test_client.py
@@ -0,0 +1,451 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Unit tests for imagecraft/losetup/__init__.py (the losetup client)."""
+
+import json
+import pathlib
+import subprocess
+from unittest.mock import MagicMock
+
+import pytest
+
+import imagecraft.losetup as losetup_module
+from imagecraft.losetup import (
+ _LXD_GUEST_SOCK,
+ _LOOPSERVER_SOCK,
+ _is_lxd_container,
+ _loopserver_request,
+ _losetup_attach,
+ _losetup_detach,
+ attach,
+ detach,
+)
+
+
+# ─── constants ────────────────────────────────────────────────────────────────
+
+
+def test_lxd_guest_sock_constant():
+ assert _LXD_GUEST_SOCK == "/dev/lxd/sock"
+
+
+def test_loopserver_sock_constant():
+ assert _LOOPSERVER_SOCK == "/dev/losetup/sock"
+
+
+# ─── _is_lxd_container ────────────────────────────────────────────────────────
+
+
+class TestIsLxdContainer:
+ def test_returns_false_when_sock_missing(self, mocker, tmp_path):
+ mocker.patch("imagecraft.losetup._LXD_GUEST_SOCK", str(tmp_path / "nonexistent"))
+ mocker.patch("craft_cli.emit.debug")
+ assert _is_lxd_container() is False
+
+ def test_returns_true_when_instance_type_container(self, mocker, tmp_path):
+ sock_path = tmp_path / "sock"
+ sock_path.touch()
+ mocker.patch("imagecraft.losetup._LXD_GUEST_SOCK", str(sock_path))
+ mocker.patch("craft_cli.emit.debug")
+
+ mock_response = MagicMock()
+ mock_response.read.return_value = json.dumps(
+ {"instance_type": "container"}
+ ).encode()
+ mock_conn = MagicMock()
+ mock_conn.getresponse.return_value = mock_response
+ mocker.patch(
+ "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn
+ )
+ mocker.patch("imagecraft.losetup.socket.socket")
+
+ assert _is_lxd_container() is True
+
+ def test_returns_false_when_instance_type_vm(self, mocker, tmp_path):
+ sock_path = tmp_path / "sock"
+ sock_path.touch()
+ mocker.patch("imagecraft.losetup._LXD_GUEST_SOCK", str(sock_path))
+ mocker.patch("craft_cli.emit.debug")
+
+ mock_response = MagicMock()
+ mock_response.read.return_value = json.dumps(
+ {"instance_type": "virtual-machine"}
+ ).encode()
+ mock_conn = MagicMock()
+ mock_conn.getresponse.return_value = mock_response
+ mocker.patch(
+ "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn
+ )
+ mocker.patch("imagecraft.losetup.socket.socket")
+
+ assert _is_lxd_container() is False
+
+ def test_returns_false_when_instance_type_missing(self, mocker, tmp_path):
+ sock_path = tmp_path / "sock"
+ sock_path.touch()
+ mocker.patch("imagecraft.losetup._LXD_GUEST_SOCK", str(sock_path))
+ mocker.patch("craft_cli.emit.debug")
+
+ mock_response = MagicMock()
+ mock_response.read.return_value = json.dumps({}).encode()
+ mock_conn = MagicMock()
+ mock_conn.getresponse.return_value = mock_response
+ mocker.patch(
+ "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn
+ )
+ mocker.patch("imagecraft.losetup.socket.socket")
+
+ assert _is_lxd_container() is False
+
+ def test_returns_false_on_connection_error(self, mocker, tmp_path):
+ sock_path = tmp_path / "sock"
+ sock_path.touch()
+ mocker.patch("imagecraft.losetup._LXD_GUEST_SOCK", str(sock_path))
+ mocker.patch("craft_cli.emit.debug")
+
+ mock_sock = MagicMock()
+ mock_sock.connect.side_effect = ConnectionRefusedError("refused")
+ mocker.patch("imagecraft.losetup.socket.socket", return_value=mock_sock)
+ mocker.patch("imagecraft.losetup.http.client.HTTPConnection")
+
+ assert _is_lxd_container() is False
+
+ def test_returns_false_on_json_error(self, mocker, tmp_path):
+ sock_path = tmp_path / "sock"
+ sock_path.touch()
+ mocker.patch("imagecraft.losetup._LXD_GUEST_SOCK", str(sock_path))
+ mocker.patch("craft_cli.emit.debug")
+
+ mock_response = MagicMock()
+ mock_response.read.return_value = b"not-json"
+ mock_conn = MagicMock()
+ mock_conn.getresponse.return_value = mock_response
+ mocker.patch(
+ "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn
+ )
+ mocker.patch("imagecraft.losetup.socket.socket")
+
+ assert _is_lxd_container() is False
+
+ def test_never_raises_on_connection_failure(self, mocker, tmp_path):
+ """Exceptions during the HTTP call (inside the try block) never propagate."""
+ sock_path = tmp_path / "sock"
+ sock_path.touch()
+ mocker.patch("imagecraft.losetup._LXD_GUEST_SOCK", str(sock_path))
+ mocker.patch("craft_cli.emit.debug")
+
+ mock_conn = MagicMock()
+ mock_conn.getresponse.side_effect = OSError("pipe broken")
+ mocker.patch(
+ "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn
+ )
+ mocker.patch("imagecraft.losetup.socket.socket")
+
+ result = _is_lxd_container()
+ assert result is False
+
+
+# ─── _loopserver_request ──────────────────────────────────────────────────────
+
+
+class TestLoopserverRequest:
+ def _make_conn(self, mocker, response_data: dict):
+ mock_response = MagicMock()
+ mock_response.read.return_value = json.dumps(response_data).encode()
+ mock_conn = MagicMock()
+ mock_conn.getresponse.return_value = mock_response
+ mocker.patch(
+ "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn
+ )
+ mocker.patch("imagecraft.losetup.socket.socket")
+ return mock_conn
+
+ def test_returns_metadata_on_success(self, mocker):
+ expected = ["/dev/imagecraft-loop0", "/dev/imagecraft-loop0p1"]
+ self._make_conn(mocker, {"status_code": 200, "metadata": expected})
+
+ result = _loopserver_request("/1.0/attach", "/root/disk.img")
+
+ assert result == expected
+
+ def test_raises_on_error_code(self, mocker):
+ self._make_conn(
+ mocker,
+ {"error_code": 500, "error": "losetup failed"},
+ )
+
+ with pytest.raises(RuntimeError, match="losetup failed"):
+ _loopserver_request("/1.0/attach", "/root/disk.img")
+
+ def test_raises_with_fallback_message_when_no_error_field(self, mocker):
+ self._make_conn(mocker, {"error_code": 500})
+
+ with pytest.raises(RuntimeError, match="unknown error from loopserver"):
+ _loopserver_request("/1.0/attach", "/root/disk.img")
+
+ def test_connects_to_loopserver_sock(self, mocker):
+ mock_sock = MagicMock()
+ mocker.patch("imagecraft.losetup.socket.socket", return_value=mock_sock)
+ mock_conn = MagicMock()
+ mock_conn.getresponse.return_value.read.return_value = json.dumps(
+ {"metadata": []}
+ ).encode()
+ mocker.patch(
+ "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn
+ )
+
+ _loopserver_request("/1.0/attach", "/some/path")
+
+ mock_sock.connect.assert_called_once_with(_LOOPSERVER_SOCK)
+
+ def test_path_encoded_in_query(self, mocker):
+ mock_sock = MagicMock()
+ mocker.patch("imagecraft.losetup.socket.socket", return_value=mock_sock)
+ mock_conn = MagicMock()
+ mock_conn.getresponse.return_value.read.return_value = json.dumps(
+ {"metadata": []}
+ ).encode()
+ mocker.patch(
+ "imagecraft.losetup.http.client.HTTPConnection", return_value=mock_conn
+ )
+
+ _loopserver_request("/1.0/attach", "/root/my image.img")
+
+ url = mock_conn.request.call_args[0][1]
+ assert "path=%2Froot%2Fmy+image.img" in url or "path=%2Froot%2Fmy%20image.img" in url
+
+
+# ─── _losetup_attach ──────────────────────────────────────────────────────────
+
+
+class TestLosetupAttach:
+ def _make_run(self, mocker, loop_dev="/dev/loop8", partitions=None):
+ partitions = partitions or []
+ children = [{"name": f"loop8{p}", "type": "part"} for p in partitions]
+ lsblk_output = json.dumps(
+ {
+ "blockdevices": [
+ {"name": "loop8", "type": "loop", "children": children}
+ ]
+ }
+ )
+ losetup_result = MagicMock()
+ losetup_result.stdout = loop_dev + "\n"
+ lsblk_result = MagicMock()
+ lsblk_result.stdout = lsblk_output
+
+ return mocker.patch(
+ "imagecraft.losetup.subprocess.run",
+ side_effect=[losetup_result, lsblk_result],
+ )
+
+ def test_returns_loop_device_no_partitions(self, mocker, tmp_path):
+ img = tmp_path / "disk.img"
+ self._make_run(mocker, loop_dev="/dev/loop8", partitions=[])
+
+ result = _losetup_attach(img)
+
+ assert result == ["/dev/loop8"]
+
+ def test_returns_loop_and_partitions(self, mocker, tmp_path):
+ img = tmp_path / "disk.img"
+ self._make_run(mocker, loop_dev="/dev/loop8", partitions=["p1", "p2"])
+
+ result = _losetup_attach(img)
+
+ assert result == ["/dev/loop8", "/dev/loop8p1", "/dev/loop8p2"]
+
+ def test_calls_losetup_find_show_partscan(self, mocker, tmp_path):
+ img = tmp_path / "disk.img"
+ mock_run = self._make_run(mocker, loop_dev="/dev/loop8")
+
+ _losetup_attach(img)
+
+ losetup_call = mock_run.call_args_list[0]
+ assert losetup_call[0][0] == [
+ "losetup",
+ "--find",
+ "--show",
+ "--partscan",
+ str(img),
+ ]
+
+ def test_calls_lsblk_on_loop_device(self, mocker, tmp_path):
+ img = tmp_path / "disk.img"
+ mock_run = self._make_run(mocker, loop_dev="/dev/loop8")
+
+ _losetup_attach(img)
+
+ lsblk_call = mock_run.call_args_list[1]
+ cmd = lsblk_call[0][0]
+ assert "lsblk" in cmd
+ assert "/dev/loop8" in cmd
+
+ def test_ignores_non_part_children(self, mocker, tmp_path):
+ img = tmp_path / "disk.img"
+ lsblk_output = json.dumps(
+ {
+ "blockdevices": [
+ {
+ "name": "loop8",
+ "type": "loop",
+ "children": [
+ {"name": "loop8p1", "type": "part"},
+ {"name": "dm-0", "type": "dm"}, # not a partition
+ ],
+ }
+ ]
+ }
+ )
+ losetup_result = MagicMock()
+ losetup_result.stdout = "/dev/loop8\n"
+ lsblk_result = MagicMock()
+ lsblk_result.stdout = lsblk_output
+ mocker.patch(
+ "imagecraft.losetup.subprocess.run",
+ side_effect=[losetup_result, lsblk_result],
+ )
+
+ result = _losetup_attach(img)
+
+ assert "/dev/dm-0" not in result
+ assert "/dev/loop8p1" in result
+
+
+# ─── _losetup_detach ──────────────────────────────────────────────────────────
+
+
+class TestLosetupDetach:
+ def _setup(self, mocker, device="/dev/loop8", partitions=None):
+ partitions = partitions or []
+ children = [{"name": f"loop8{p}", "type": "part"} for p in partitions]
+ lsblk_output = json.dumps(
+ {"blockdevices": [{"name": "loop8", "type": "loop", "children": children}]}
+ )
+ lsblk_result = MagicMock()
+ lsblk_result.stdout = lsblk_output
+ detach_result = MagicMock()
+
+ return mocker.patch(
+ "imagecraft.losetup.subprocess.run",
+ side_effect=[lsblk_result, detach_result],
+ )
+
+ def test_returns_device_no_partitions(self, mocker):
+ self._setup(mocker, device="/dev/loop8", partitions=[])
+
+ result = _losetup_detach("/dev/loop8")
+
+ assert result == ["/dev/loop8"]
+
+ def test_returns_device_and_partitions(self, mocker):
+ self._setup(mocker, device="/dev/loop8", partitions=["p1", "p2"])
+
+ result = _losetup_detach("/dev/loop8")
+
+ assert result == ["/dev/loop8", "/dev/loop8p1", "/dev/loop8p2"]
+
+ def test_calls_losetup_detach(self, mocker):
+ mock_run = self._setup(mocker)
+
+ _losetup_detach("/dev/loop8")
+
+ detach_call = mock_run.call_args_list[1]
+ assert detach_call[0][0] == ["losetup", "--detach", "/dev/loop8"]
+
+ def test_ignores_non_part_children(self, mocker):
+ lsblk_output = json.dumps(
+ {
+ "blockdevices": [
+ {
+ "name": "loop8",
+ "type": "loop",
+ "children": [
+ {"name": "loop8p1", "type": "part"},
+ {"name": "dm-0", "type": "dm"},
+ ],
+ }
+ ]
+ }
+ )
+ lsblk_result = MagicMock()
+ lsblk_result.stdout = lsblk_output
+ mocker.patch(
+ "imagecraft.losetup.subprocess.run",
+ side_effect=[lsblk_result, MagicMock()],
+ )
+
+ result = _losetup_detach("/dev/loop8")
+
+ assert "/dev/dm-0" not in result
+
+
+# ─── attach / detach (high-level) ─────────────────────────────────────────────
+
+
+class TestAttach:
+ def test_delegates_to_loopserver_when_lxd_container(self, mocker):
+ mocker.patch("imagecraft.losetup._is_lxd_container", return_value=True)
+ mock_req = mocker.patch(
+ "imagecraft.losetup._loopserver_request",
+ return_value=["/dev/imagecraft-loop0"],
+ )
+ mocker.patch("craft_cli.emit.debug")
+
+ result = attach(pathlib.Path("/root/disk.img"))
+
+ mock_req.assert_called_once_with("/1.0/attach", "/root/disk.img")
+ assert result == ["/dev/imagecraft-loop0"]
+
+ def test_delegates_to_losetup_when_not_container(self, mocker):
+ mocker.patch("imagecraft.losetup._is_lxd_container", return_value=False)
+ mock_attach = mocker.patch(
+ "imagecraft.losetup._losetup_attach",
+ return_value=["/dev/loop8"],
+ )
+ mocker.patch("craft_cli.emit.debug")
+
+ result = attach(pathlib.Path("/root/disk.img"))
+
+ mock_attach.assert_called_once_with(pathlib.Path("/root/disk.img"))
+ assert result == ["/dev/loop8"]
+
+
+class TestDetach:
+ def test_delegates_to_loopserver_when_lxd_container(self, mocker):
+ mocker.patch("imagecraft.losetup._is_lxd_container", return_value=True)
+ mock_req = mocker.patch(
+ "imagecraft.losetup._loopserver_request",
+ return_value=["/dev/imagecraft-loop0"],
+ )
+ mocker.patch("craft_cli.emit.debug")
+
+ result = detach("/dev/imagecraft-loop0")
+
+ mock_req.assert_called_once_with("/1.0/detach", "/dev/imagecraft-loop0")
+ assert result == ["/dev/imagecraft-loop0"]
+
+ def test_delegates_to_losetup_when_not_container(self, mocker):
+ mocker.patch("imagecraft.losetup._is_lxd_container", return_value=False)
+ mock_detach = mocker.patch(
+ "imagecraft.losetup._losetup_detach",
+ return_value=["/dev/loop8"],
+ )
+ mocker.patch("craft_cli.emit.debug")
+
+ result = detach("/dev/loop8")
+
+ mock_detach.assert_called_once_with("/dev/loop8")
+ assert result == ["/dev/loop8"]
diff --git a/tests/unit/services/test_image.py b/tests/unit/services/test_image.py
index 221d84af..92347c38 100644
--- a/tests/unit/services/test_image.py
+++ b/tests/unit/services/test_image.py
@@ -12,7 +12,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-import subprocess
from unittest.mock import MagicMock, patch
import pytest
@@ -99,95 +98,53 @@ def test_create_images_idempotent(image_service, mock_services, mock_project):
def test_attach_images_new(image_service, project_dir, mocker):
image_service._images = {"pc": project_dir / ".pc.img.tmp"}
- mock_run = mocker.patch("imagecraft.services.image.run")
- # Mock _get_all_loop_devices returns empty
- mocker.patch.object(image_service, "_get_all_loop_devices", return_value=[])
-
- mock_run.return_value.stdout = "/dev/loop8\n"
+ mock_attach = mocker.patch(
+ "imagecraft.losetup.attach",
+ return_value=["/dev/loop8"],
+ )
with patch("atexit.register") as mock_atexit:
devices = image_service.attach_images()
assert devices == {"pc": "/dev/loop8"}
- mock_run.assert_called_with(
- "losetup",
- "--find",
- "--show",
- "--partscan",
- str(project_dir / ".pc.img.tmp"),
- )
+ mock_attach.assert_called_once_with(project_dir / ".pc.img.tmp")
mock_atexit.assert_called_once_with(image_service.detach_images)
-def test_attach_images_reuse(image_service, project_dir, mocker):
- image_path = project_dir / ".pc.img.tmp"
- image_path.touch()
- image_service._images = {"pc": image_path}
-
- # Mock existing loop device
- mocker.patch.object(
- image_service,
- "_get_all_loop_devices",
- return_value=[{"name": "/dev/loop9", "back-file": str(image_path)}],
- )
-
- # Mock samefile to return True
- mocker.patch("pathlib.Path.samefile", return_value=True)
- mock_run = mocker.patch("imagecraft.services.image.run")
-
- devices = image_service.attach_images()
-
- assert devices == {"pc": "/dev/loop9"}
- mock_run.assert_not_called() # Should not call losetup attach
-
-
-def test_attach_images_stale_inode(image_service, project_dir, mocker):
- image_path = project_dir / ".pc.img.tmp"
- image_path.touch()
- image_service._images = {"pc": image_path}
+def test_attach_images_idempotent(image_service, project_dir, mocker):
+ image_service._images = {"pc": project_dir / ".pc.img.tmp"}
- # Mock existing loop device
- mocker.patch.object(
- image_service,
- "_get_all_loop_devices",
- return_value=[{"name": "/dev/loop10", "back-file": str(image_path)}],
+ mock_attach = mocker.patch(
+ "imagecraft.losetup.attach",
+ return_value=["/dev/loop8"],
)
- # Mock samefile to raise FileNotFoundError (stale inode)
- mocker.patch("pathlib.Path.samefile", side_effect=FileNotFoundError)
- mock_run = mocker.patch("imagecraft.services.image.run")
- mock_run.return_value.stdout = "/dev/loop11\n"
-
- devices = image_service.attach_images()
+ first = image_service.attach_images()
+ second = image_service.attach_images()
- assert devices == {"pc": "/dev/loop11"}
- # Should detach stale
- mock_run.assert_any_call("losetup", "-d", "/dev/loop10")
- # Should attach new
- mock_run.assert_any_call(
- "losetup", "--find", "--show", "--partscan", str(image_path)
- )
+ assert first == second == {"pc": "/dev/loop8"}
+ mock_attach.assert_called_once() # only called the first time
def test_detach_images_success(image_service, mocker):
image_service._loop_devices = {"pc": "/dev/loop8"}
- mock_run = mocker.patch("imagecraft.services.image.run")
+ mock_detach = mocker.patch("imagecraft.losetup.detach")
image_service.detach_images()
- mock_run.assert_called_once_with("losetup", "-d", "/dev/loop8")
+ mock_detach.assert_called_once_with("/dev/loop8")
assert image_service._loop_devices == {}
def test_detach_images_retry(image_service, mocker):
image_service._loop_devices = {"pc": "/dev/loop8"}
- mock_run = mocker.patch("imagecraft.services.image.run")
+ mock_detach = mocker.patch("imagecraft.losetup.detach")
# Fail twice, then succeed
- mock_run.side_effect = [
- subprocess.CalledProcessError(1, "losetup"),
- subprocess.CalledProcessError(1, "losetup"),
- MagicMock(),
+ mock_detach.side_effect = [
+ RuntimeError("device busy"),
+ RuntimeError("device busy"),
+ None,
]
mocker.patch("time.monotonic", side_effect=[0, 1, 2, 3, 4])
@@ -195,7 +152,7 @@ def test_detach_images_retry(image_service, mocker):
image_service.detach_images()
- assert mock_run.call_count == 3
+ assert mock_detach.call_count == 3
assert image_service._loop_devices == {}
diff --git a/tests/unit/services/test_provider.py b/tests/unit/services/test_provider.py
new file mode 100644
index 00000000..a263ed61
--- /dev/null
+++ b/tests/unit/services/test_provider.py
@@ -0,0 +1,180 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Unit tests for imagecraft/services/provider.py – Provider.instance override."""
+
+import pathlib
+from contextlib import contextmanager
+from unittest.mock import MagicMock
+
+import pytest
+from craft_application import AppMetadata, ServiceFactory
+from craft_application.services.provider import ProviderService
+from craft_providers.lxd import LXDInstance, LXDProvider
+from craft_providers.multipass import MultipassProvider
+
+from imagecraft.services.provider import Provider
+
+
+# ─── fixtures ─────────────────────────────────────────────────────────────────
+
+
+@pytest.fixture
+def mock_lxd_inst():
+ """A mock pylxd instance (returned by _client.instances.get())."""
+ inst = MagicMock()
+ inst.devices = {}
+ return inst
+
+
+@pytest.fixture
+def mock_instance(mock_lxd_inst):
+ """A mock LXDInstance with a wired-up pylxd _client."""
+ m = MagicMock(spec=LXDInstance)
+ m.instance_name = "test-container"
+ m._client = MagicMock()
+ m._client.instances.get.return_value = mock_lxd_inst
+ return m
+
+
+@pytest.fixture
+def provider(tmp_path):
+ app = MagicMock(spec=AppMetadata)
+ services = MagicMock(spec=ServiceFactory)
+ return Provider(app, services, work_dir=tmp_path)
+
+
+# ─── helpers ──────────────────────────────────────────────────────────────────
+
+
+def _patch_super_instance(mocker, mock_inst):
+ """Patch ProviderService.instance to yield *mock_inst*."""
+
+ @contextmanager
+ def fake_instance(self, build_info, *, work_dir, **kwargs):
+ yield mock_inst
+
+ mocker.patch.object(ProviderService, "instance", fake_instance)
+
+
+# ─── tests ────────────────────────────────────────────────────────────────────
+
+
+class TestProviderInstance:
+ def test_lxd_with_snap_data_mounts_losetup(
+ self, mocker, provider, mock_instance, mock_lxd_inst, tmp_path, monkeypatch
+ ):
+ monkeypatch.setenv("SNAP_DATA", str(tmp_path))
+ _patch_super_instance(mocker, mock_instance)
+ mocker.patch.object(
+ provider, "get_provider", return_value=MagicMock(spec=LXDProvider)
+ )
+ mocker.patch("craft_cli.emit.debug")
+
+ build_info = MagicMock()
+ with provider.instance(build_info, work_dir=tmp_path) as inst:
+ assert inst is mock_instance
+
+ assert "disk-dev-losetup" in mock_lxd_inst.devices
+ device = mock_lxd_inst.devices["disk-dev-losetup"]
+ assert device["type"] == "disk"
+ assert device["path"] == "/dev/losetup"
+ assert device["source"] == str(tmp_path / "losetup")
+ assert device["shift"] == "true"
+ mock_lxd_inst.save.assert_called_once_with(wait=True)
+
+ def test_lxd_with_snap_data_creates_losetup_dir(
+ self, mocker, provider, mock_instance, tmp_path, monkeypatch
+ ):
+ monkeypatch.setenv("SNAP_DATA", str(tmp_path))
+ _patch_super_instance(mocker, mock_instance)
+ mocker.patch.object(
+ provider, "get_provider", return_value=MagicMock(spec=LXDProvider)
+ )
+ mocker.patch("craft_cli.emit.debug")
+
+ build_info = MagicMock()
+ with provider.instance(build_info, work_dir=tmp_path):
+ pass
+
+ assert (tmp_path / "losetup").is_dir()
+
+ def test_lxd_without_snap_data_does_not_mount(
+ self, mocker, provider, mock_instance, mock_lxd_inst, tmp_path, monkeypatch
+ ):
+ monkeypatch.delenv("SNAP_DATA", raising=False)
+ _patch_super_instance(mocker, mock_instance)
+ mocker.patch.object(
+ provider, "get_provider", return_value=MagicMock(spec=LXDProvider)
+ )
+ mocker.patch("craft_cli.emit.debug")
+
+ build_info = MagicMock()
+ with provider.instance(build_info, work_dir=tmp_path) as inst:
+ assert inst is mock_instance
+
+ mock_lxd_inst.save.assert_not_called()
+
+ def test_non_lxd_provider_does_not_mount(
+ self, mocker, provider, mock_instance, mock_lxd_inst, tmp_path, monkeypatch
+ ):
+ monkeypatch.setenv("SNAP_DATA", str(tmp_path))
+ _patch_super_instance(mocker, mock_instance)
+ mocker.patch.object(
+ provider, "get_provider", return_value=MagicMock(spec=MultipassProvider)
+ )
+ mocker.patch("craft_cli.emit.debug")
+
+ build_info = MagicMock()
+ with provider.instance(build_info, work_dir=tmp_path) as inst:
+ assert inst is mock_instance
+
+ mock_lxd_inst.save.assert_not_called()
+
+ def test_already_mounted_does_not_add_duplicate(
+ self, mocker, provider, mock_instance, mock_lxd_inst, tmp_path, monkeypatch
+ ):
+ """If disk-dev-losetup already exists in devices, save() is not called again."""
+ monkeypatch.setenv("SNAP_DATA", str(tmp_path))
+ _patch_super_instance(mocker, mock_instance)
+ mocker.patch.object(
+ provider, "get_provider", return_value=MagicMock(spec=LXDProvider)
+ )
+ mocker.patch("craft_cli.emit.debug")
+ mock_lxd_inst.devices["disk-dev-losetup"] = {
+ "type": "disk",
+ "path": "/dev/losetup",
+ "source": str(tmp_path / "losetup"),
+ "shift": "true",
+ }
+
+ build_info = MagicMock()
+ with provider.instance(build_info, work_dir=tmp_path):
+ pass
+
+ mock_lxd_inst.save.assert_not_called()
+
+ def test_yields_instance_from_super(
+ self, mocker, provider, mock_instance, tmp_path, monkeypatch
+ ):
+ monkeypatch.delenv("SNAP_DATA", raising=False)
+ _patch_super_instance(mocker, mock_instance)
+ mocker.patch.object(
+ provider, "get_provider", return_value=MagicMock(spec=MultipassProvider)
+ )
+ mocker.patch("craft_cli.emit.debug")
+
+ build_info = MagicMock()
+ with provider.instance(build_info, work_dir=tmp_path) as inst:
+ assert inst is mock_instance
diff --git a/tests/unit/test_configure.py b/tests/unit/test_configure.py
new file mode 100644
index 00000000..121c9a30
--- /dev/null
+++ b/tests/unit/test_configure.py
@@ -0,0 +1,90 @@
+# Copyright 2026 Canonical Ltd.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Unit tests for snap/hooks/configure – setup_losetup_dir()."""
+
+import importlib.util
+import os
+import pathlib
+import sys
+import types
+
+import pytest
+
+
+def _load_configure_hook() -> types.ModuleType:
+ """Load snap/hooks/configure as a Python module (no .py extension)."""
+ import importlib.machinery
+
+ hook_path = (
+ pathlib.Path(__file__).parent.parent.parent.parent / "snap" / "hooks" / "configure"
+ )
+ loader = importlib.machinery.SourceFileLoader("snap_configure", str(hook_path))
+ spec = importlib.util.spec_from_loader("snap_configure", loader)
+ assert spec is not None
+ module = importlib.util.module_from_spec(spec)
+ loader.exec_module(module)
+ return module
+
+
+@pytest.fixture(scope="module")
+def configure_module():
+ return _load_configure_hook()
+
+
+class TestSetupLosetupDir:
+ def test_creates_directory(self, configure_module, tmp_path, monkeypatch):
+ monkeypatch.setenv("SNAP_DATA", str(tmp_path))
+
+ configure_module.setup_losetup_dir()
+
+ assert (tmp_path / "losetup").is_dir()
+
+ def test_does_not_raise_if_directory_exists(
+ self, configure_module, tmp_path, monkeypatch
+ ):
+ monkeypatch.setenv("SNAP_DATA", str(tmp_path))
+ (tmp_path / "losetup").mkdir() # pre-create
+
+ # Should not raise
+ configure_module.setup_losetup_dir()
+
+ def test_chowns_when_lxd_group_exists(
+ self, configure_module, tmp_path, monkeypatch, mocker
+ ):
+ monkeypatch.setenv("SNAP_DATA", str(tmp_path))
+
+ mock_gr = mocker.MagicMock()
+ mock_gr.gr_gid = 1234
+ mocker.patch.object(configure_module.grp, "getgrnam", return_value=mock_gr)
+ mock_chown = mocker.patch.object(configure_module.os, "chown")
+
+ configure_module.setup_losetup_dir()
+
+ losetup_dir = os.path.join(str(tmp_path), "losetup")
+ mock_chown.assert_called_once_with(losetup_dir, -1, 1234)
+
+ def test_no_chown_when_lxd_group_missing(
+ self, configure_module, tmp_path, monkeypatch, mocker
+ ):
+ monkeypatch.setenv("SNAP_DATA", str(tmp_path))
+
+ mocker.patch.object(
+ configure_module.grp, "getgrnam", side_effect=KeyError("lxd")
+ )
+ mock_chown = mocker.patch.object(configure_module.os, "chown")
+
+ configure_module.setup_losetup_dir()
+
+ mock_chown.assert_not_called()