Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions imagecraft/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ def register_services() -> None:
ServiceFactory.register(
"project", "ImagecraftProjectService", module="imagecraft.services.project"
)
ServiceFactory.register(
"provider", "Provider", module="imagecraft.services.provider"
)


def _create_app() -> Imagecraft:
Expand Down
145 changes: 145 additions & 0 deletions imagecraft/losetup/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Copyright 2026 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""losetup client: attach/detach loop devices via the loopserver REST API when
running inside an LXD container, or via the real losetup binary otherwise.
"""

import http.client
import json
import pathlib
import socket
import subprocess
import urllib.parse

from craft_cli import emit

_LXD_GUEST_SOCK = "/dev/lxd/sock"
_LOOPSERVER_SOCK = "/dev/losetup/sock"


def _is_lxd_container() -> bool:
"""Return True if we are running inside an LXD container."""
if not pathlib.Path(_LXD_GUEST_SOCK).exists():
emit.debug(f"LXD guest socket {_LXD_GUEST_SOCK!r} not found; not in a container")
return False
try:
conn = http.client.HTTPConnection("lxd")
conn.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
conn.sock.connect(_LXD_GUEST_SOCK)
conn.request("GET", "/1.0")
data = json.loads(conn.getresponse().read())
instance_type = data.get("instance_type")
if instance_type == "container":
emit.debug(
f"Detected LXD container via {_LXD_GUEST_SOCK!r} "
f"(instance_type={instance_type!r})"
)
return True
emit.debug(
f"LXD guest socket present but instance_type={instance_type!r}; "
"not treating as container"
)
return False
except Exception as exc: # noqa: BLE001
emit.debug(
f"Failed to query LXD guest socket {_LXD_GUEST_SOCK!r}: {exc}; "
"assuming not in a container"
)
return False


def _loopserver_request(endpoint: str, path: str) -> list[str]:
"""POST to the loopserver REST API and return the metadata list."""
query = urllib.parse.urlencode({"path": path})
conn = http.client.HTTPConnection("loopserver")
conn.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
conn.sock.connect(_LOOPSERVER_SOCK)
conn.request("POST", f"{endpoint}?{query}")
response = json.loads(conn.getresponse().read())
if response.get("error_code"):
raise RuntimeError(response.get("error", "unknown error from loopserver"))
return response["metadata"]


def _losetup_attach(path: pathlib.Path) -> list[str]:
"""Attach *path* as a loop device using the real losetup, return all device paths."""
loop_dev = subprocess.run(
["losetup", "--find", "--show", "--partscan", str(path)],
capture_output=True,
text=True,
check=True,
).stdout.strip()

lsblk_data = json.loads(
subprocess.run(
["lsblk", "--json", "--output", "NAME,TYPE", loop_dev],
capture_output=True,
text=True,
check=True,
).stdout
)
devices = [loop_dev]
for child in lsblk_data["blockdevices"][0].get("children", []):
if child.get("type") == "part":
devices.append(f"/dev/{child['name']}")
return devices


def _losetup_detach(device: str) -> list[str]:
"""Detach *device* using the real losetup, return all device paths that were removed."""
lsblk_data = json.loads(
subprocess.run(
["lsblk", "--json", "--output", "NAME,TYPE", device],
capture_output=True,
text=True,
check=True,
).stdout
)
devices = [device]
for child in lsblk_data["blockdevices"][0].get("children", []):
if child.get("type") == "part":
devices.append(f"/dev/{child['name']}")

subprocess.run(["losetup", "--detach", device], check=True)
return devices


def attach(path: pathlib.Path) -> list[str]:
"""Attach *path* as a loop device and return all device paths (loop + partitions).

When running inside an LXD container, delegates to the loopserver REST API.
Otherwise calls the real losetup binary directly.
"""
if _is_lxd_container():
emit.debug(f"Attaching {path} via loopserver at {_LOOPSERVER_SOCK!r}")
return _loopserver_request("/1.0/attach", str(path))
emit.debug(f"Attaching {path} via losetup directly")
return _losetup_attach(path)


def detach(device: str) -> list[str]:
"""Detach the loop device *device* and return all device paths that were removed.

When running inside an LXD container, delegates to the loopserver REST API.
Otherwise calls the real losetup binary directly.
"""
if _is_lxd_container():
emit.debug(f"Detaching {device} via loopserver at {_LOOPSERVER_SOCK!r}")
return _loopserver_request("/1.0/detach", device)
emit.debug(f"Detaching {device} via losetup directly")
return _losetup_detach(device)


__all__ = ["attach", "detach"]
19 changes: 19 additions & 0 deletions imagecraft/losetup/server/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2026 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""losetup server package."""

from imagecraft.losetup.server._server import main

__all__ = ["main"]
19 changes: 19 additions & 0 deletions imagecraft/losetup/server/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2026 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Allow running the losetup server with ``python3 -m imagecraft.losetup.server``."""

from imagecraft.losetup.server._server import main

main()
50 changes: 50 additions & 0 deletions imagecraft/losetup/server/_cgroup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2026 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""cgroup v2 helpers for determining LXD container membership."""

import socket
import struct


def peer_cgroup(conn: socket.socket) -> str:
"""Return the cgroup v2 string for the process on the other end of *conn*."""
cred = conn.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, struct.calcsize("3i"))
pid, _uid, _gid = struct.unpack("3i", cred)
with open(f"/proc/{pid}/cgroup") as f:
return f.read()


def parse_lxd_location(cgroup: str) -> tuple[str, str] | None:
"""Return (project, container) from a cgroup v2 string, or None.

The cgroup v2 entry always has the form ``0::$PATH``. Within that path we
walk the components in reverse to find the innermost one that begins with
``lxc.payload.``, then split it to recover the LXD project and container.
LXD project names are restricted to [a-zA-Z0-9-] (no underscores), so the
first ``_`` is always the delimiter between project and container name.
"""
for line in cgroup.splitlines():
if not line.startswith("0::"):
continue
path = line[len("0::"):]
for component in reversed(path.split("/")):
if not component.startswith("lxc.payload."):
continue
project_container = component.split(".")[-1]
parts = project_container.split("_", 1)
if len(parts) == 2:
return parts[0], parts[1]
break # found the v2 line but no lxc.payload component
return None
98 changes: 98 additions & 0 deletions imagecraft/losetup/server/_lxd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Copyright 2026 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""LXD REST API helpers and path translation."""

import http.client
import json
import os
import pathlib
import socket

LXD_SOCKET = "/var/snap/lxd/common/lxd/unix.socket"


def _lxd_connect() -> http.client.HTTPConnection:
conn = http.client.HTTPConnection("lxd")
conn.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
conn.sock.connect(LXD_SOCKET)
return conn


def lxd_get(path: str) -> dict:
"""Make a GET request to the LXD REST API and return the parsed response."""
conn = _lxd_connect()
conn.request("GET", path)
return json.loads(conn.getresponse().read())


def lxd_patch(path: str, body: dict) -> dict:
"""Make a PATCH request to the LXD REST API and return the parsed response."""
conn = _lxd_connect()
data = json.dumps(body).encode()
conn.request("PATCH", path, body=data, headers={"Content-Type": "application/json"})
return json.loads(conn.getresponse().read())


def find_free_loop_slot(devices: dict) -> int:
"""Find the smallest N such that no ``imagecraft-loopN*`` device exists."""
used: set[int] = set()
for name in devices:
if name.startswith("imagecraft-loop"):
num_str = name[len("imagecraft-loop"):].split("p")[0]
if num_str.isdigit():
used.add(int(num_str))
n = 0
while n in used:
n += 1
return n


def convert_container_path_to_host_path(
project: str, container: str, container_path: str
) -> pathlib.Path:
"""Translate *container_path* to an absolute path on the host.

Queries the LXD API for the container's disk devices, finds the one with
the longest ``path`` prefix that matches *container_path* and has a
``source``, then substitutes that source for the prefix.

Raises ``ValueError`` if no sourced device covers the given path.
"""
data = lxd_get(f"/1.0/instances/{container}?project={project}")
devices = data["metadata"]["expanded_devices"]

# Collect disk devices that bind-mount a host source path.
sourced = [
(dev["path"].rstrip("/"), dev["source"])
for dev in devices.values()
if dev.get("type") == "disk" and "source" in dev
]

# Normalise the container path and find the longest matching mount prefix.
norm = os.path.normpath(container_path)
best_mount = best_source = None
for mount, source in sourced:
if norm == mount or norm.startswith(mount + "/"):
if best_mount is None or len(mount) > len(best_mount):
best_mount = mount
best_source = source

if best_mount is None:
raise ValueError(
f"{container_path!r} is not under any sourced device in {container!r}"
)

relative = norm[len(best_mount):] # may be "" if exact match
return pathlib.Path(best_source + relative)
Loading
Loading