diff --git a/AGENTS.md b/AGENTS.md index 1705fe88a..249037cf3 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -71,7 +71,7 @@ dfetch.log ← logging (lowest layer) - **`dfetch/__main__.py`** — CLI entry point; builds argparse subcommands and dispatches - **`dfetch/commands/`** — One file per CLI command (e.g., `update.py`, `check.py`); all inherit from `command.py`'s abstract `Command` base - **`dfetch/manifest/`** — YAML manifest loading/writing with `strictyaml` schema validation; `manifest.py` is the main handler -- **`dfetch/project/`** — Abstract `Subproject`/`Superproject` classes with concrete Git, SVN, and Archive implementations; factory functions `create_sub_project()` and `create_super_project()` are the main entry points +- **`dfetch/project/`** — Concrete `SubProject` domain aggregate that composes with a `Fetcher`; `GitFetcher`, `SvnFetcher`, and `ArchiveFetcher` implement the `Fetcher`/`VcsFetcher` protocols defined in `fetcher.py`; factory functions `create_sub_project()` and `create_super_project()` are the main entry points - **`dfetch/vcs/`** — Low-level VCS operations: `git.py`, `svn.py`, `archive.py` (with hash verification in `integrity_hash.py`), and `patch.py` - **`dfetch/reporting/`** — Output formatters; check results can be emitted as stdout, Jenkins JSON, SARIF, or Code Climate format; SBOM output uses CycloneDX format - **`dfetch/terminal/`** — Terminal UI components (interactive prompts, tree browser, ANSI colors) @@ -84,7 +84,7 @@ dfetch.log ← logging (lowest layer) ### Adding a new VCS backend -Implement the abstract interfaces in `dfetch/project/subproject.py` and `dfetch/vcs/` and register via the factory in `dfetch/project/`. +Implement the `Fetcher` protocol (or `VcsFetcher` if you need branch/tag/revision semantics) defined in `dfetch/project/fetcher.py`, add a concrete class in `dfetch/project/`, add low-level VCS operations in `dfetch/vcs/` if needed, and register via the factory in `dfetch/project/__init__.py`. ## Testing conventions diff --git a/dfetch/commands/add.py b/dfetch/commands/add.py index a66f4f006..efc7ad28f 100644 --- a/dfetch/commands/add.py +++ b/dfetch/commands/add.py @@ -29,10 +29,8 @@ from dfetch.manifest.remote import Remote from dfetch.manifest.version import Version from dfetch.project import create_sub_project, create_super_project -from dfetch.project.gitsubproject import GitSubProject from dfetch.project.subproject import SubProject from dfetch.project.superproject import SuperProject -from dfetch.project.svnsubproject import SvnSubProject from dfetch.terminal import Entry, LsFunction from dfetch.terminal.tree_browser import ( BrowserConfig, @@ -86,9 +84,9 @@ def browse_tree(subproject: SubProject, version: str = "") -> Generator[LsFuncti Adds '.' as the first entry to allow selecting the repo root (which is treated as empty src). """ - if isinstance(subproject, (GitSubProject, SvnSubProject)): - remote = subproject.remote_repo - with remote.browse_tree(version) as vcs_ls: + vcs = subproject.as_vcs() + if vcs is not None: + with vcs.browse_tree(version) as vcs_ls: def ls(path: str = "") -> list[Entry]: entries = [ @@ -274,36 +272,47 @@ def _finalize_add( Update()(update_args) +def _resolve_entry_version(ctx: _AddContext, raw_version: str) -> Version: + """Resolve a raw version string to a ``Version`` using remote branches and tags. + + For archive-backed subprojects ``raw_version`` is preserved as a revision + identifier (URL or hash) because archives have no branch/tag semantics. + """ + if ctx.subproject.as_vcs() is None: + return Version(revision=raw_version) + branches = ctx.subproject.list_of_branches() + tags = ctx.subproject.list_of_tags() + choices: list[Version] = [ + *[Version(branch=b) for b in prioritise_default(branches, ctx.default_branch)], + *[Version(tag=t) for t in sort_tags_newest_first(tags)], + ] + return _resolve_raw_version(raw_version, choices) or Version( + branch=ctx.default_branch + ) + + def _non_interactive_entry(ctx: _AddContext, overrides: _Overrides) -> ProjectEntry: """Build a ``ProjectEntry`` using inferred defaults (no user interaction).""" - if overrides.version: - branches = ctx.subproject.list_of_branches() - tags = ctx.subproject.list_of_tags() - choices: list[Version] = [ - *[ - Version(branch=b) - for b in prioritise_default(branches, ctx.default_branch) - ], - *[Version(tag=t) for t in sort_tags_newest_first(tags)], - ] - version = _resolve_raw_version(overrides.version, choices) or Version( - branch=ctx.default_branch - ) - else: - version = Version(branch=ctx.default_branch) + version = ( + _resolve_entry_version(ctx, overrides.version) + if overrides.version + else Version(branch=ctx.default_branch) + ) existing_names = {p.name for p in ctx.manifest.projects} - return _build_entry( + entry = _build_entry( name=overrides.name or _unique_name(ctx.default_name, existing_names), remote_url=ctx.url, dst=overrides.dst or ctx.default_dst, version=version, src=overrides.src or "", ignore=overrides.ignore or [], - remote_to_use=ctx.remote_to_use, ) + if ctx.remote_to_use: + entry.set_remote(ctx.remote_to_use) + return entry -def _build_entry( # pylint: disable=too-many-arguments +def _build_entry( *, name: str, remote_url: str, @@ -311,7 +320,6 @@ def _build_entry( # pylint: disable=too-many-arguments version: Version, src: str, ignore: list[str], - remote_to_use: Remote | None, ) -> ProjectEntry: """Assemble a ``ProjectEntry`` from the fields collected by the wizard.""" kind, value = version.field @@ -325,10 +333,7 @@ def _build_entry( # pylint: disable=too-many-arguments entry_dict["src"] = src if ignore: entry_dict["ignore"] = ignore - entry = ProjectEntry(entry_dict) - if remote_to_use: - entry.set_remote(remote_to_use) - return entry + return ProjectEntry(entry_dict) # --------------------------------------------------------------------------- @@ -340,15 +345,17 @@ def _show_url_fields( name: str, remote_url: str, default_branch: str, remote_to_use: Remote | None ) -> None: """Print the fields determined solely by the URL (name, remote, url, repo-path).""" - seed = _build_entry( + entry = _build_entry( name=name, remote_url=remote_url, dst=name, version=Version(branch=default_branch), src="", ignore=[], - remote_to_use=remote_to_use, - ).as_yaml() + ) + if remote_to_use: + entry.set_remote(remote_to_use) + seed = entry.as_yaml() logger.print_yaml( {k: seed[k] for k in ("name", "remote", "url", "repo-path") if k in seed} ) @@ -414,15 +421,17 @@ def _interactive_flow(ctx: _AddContext, overrides: _Overrides) -> ProjectEntry: src, ignore = _pick_src_and_ignore(ctx.subproject, version_value, overrides) - return _build_entry( + entry = _build_entry( name=name, remote_url=ctx.url, dst=dst, version=version, src=src, ignore=ignore, - remote_to_use=ctx.remote_to_use, ) + if ctx.remote_to_use: + entry.set_remote(ctx.remote_to_use) + return entry # --------------------------------------------------------------------------- @@ -507,10 +516,12 @@ def _ask_src(ls_function: LsFunction) -> str: src = tree_single_pick(ls_function, "Source path", dirs_selectable=True) return "" if src == "." else src - return Prompt.ask( - _PROMPT_FORMAT.format(label="Source path") - + " (sub-path/glob, or Enter to fetch whole repo)", - default="", + return str( + Prompt.ask( + _PROMPT_FORMAT.format(label="Source path") + + " (sub-path/glob, or Enter to fetch whole repo)", + default="", + ) ).strip() diff --git a/dfetch/commands/environment.py b/dfetch/commands/environment.py index b4cace015..a3f602e9a 100644 --- a/dfetch/commands/environment.py +++ b/dfetch/commands/environment.py @@ -16,7 +16,7 @@ import dfetch.commands.command from dfetch.log import get_logger -from dfetch.project import SUPPORTED_SUBPROJECT_TYPES +from dfetch.project import SUPPORTED_FETCHERS logger = get_logger(__name__) @@ -37,5 +37,5 @@ def __call__(self, _: argparse.Namespace) -> None: logger.print_report_line( "platform", f"{platform.system()} {platform.release()}" ) - for project_type in SUPPORTED_SUBPROJECT_TYPES: - project_type.list_tool_info() + for fetcher_type in SUPPORTED_FETCHERS: + fetcher_type.list_tool_info() diff --git a/dfetch/commands/format_patch.py b/dfetch/commands/format_patch.py index 4bc34148e..0962ef738 100644 --- a/dfetch/commands/format_patch.py +++ b/dfetch/commands/format_patch.py @@ -34,9 +34,7 @@ import dfetch.project from dfetch.log import get_logger from dfetch.project import create_super_project -from dfetch.project.gitsubproject import GitSubProject from dfetch.project.subproject import SubProject -from dfetch.project.svnsubproject import SvnSubProject from dfetch.util.util import ( catch_runtime_exceptions, check_no_path_traversal, @@ -145,12 +143,6 @@ def __call__(self, args: argparse.Namespace) -> None: def _determine_target_patch_type(subproject: SubProject) -> PatchType: - """Determine the subproject type for the patch.""" - if isinstance(subproject, GitSubProject): - required_type = PatchType.GIT - elif isinstance(subproject, SvnSubProject): - required_type = PatchType.SVN - else: - required_type = PatchType.PLAIN - - return required_type + """Determine the patch format for *subproject*.""" + vcs = subproject.as_vcs() + return vcs.patch_type() if vcs is not None else PatchType.PLAIN diff --git a/dfetch/log.py b/dfetch/log.py index 340190bd7..ce7695135 100644 --- a/dfetch/log.py +++ b/dfetch/log.py @@ -17,15 +17,22 @@ from dfetch import __version__ -class _NoExpandLogRender(LogRender): # pylint: disable=too-few-public-methods - """LogRender that disables table expansion to prevent blank lines in asciicasts.""" +def _make_non_expanding_log_render(**kwargs: Any) -> Any: + """Return a LogRender callable that disables table expansion. - def __call__(self, *args: Any, **kwargs: Any) -> Any: - """Render log entry without expanding the table to the full terminal width.""" - table = super().__call__(*args, **kwargs) + Used when recording with asciinema to prevent Rich's ``expand=True`` from + padding log lines to the full terminal width, which produces spurious blank + lines in the cast player. + """ + renderer = LogRender(**kwargs) + + def _render(*args: Any, **kw: Any) -> Any: + table = renderer(*args, **kw) table.expand = False return table + return _render + def make_console(no_color: bool = False) -> Console: """Create a Rich Console with proper color handling.""" @@ -57,9 +64,10 @@ def configure_root_logger(console: Console | None = None) -> None: # causing the subsequent newline to produce a blank line in the cast # player. Wrapping _log_render so it returns a non-expanding table # removes the trailing spaces and avoids the spurious blank lines. - handler._log_render = _NoExpandLogRender( # pylint: disable=protected-access + no_expand = _make_non_expanding_log_render( show_time=False, show_level=False, show_path=False ) + handler._log_render = no_expand # pylint: disable=protected-access logging.basicConfig( level=logging.INFO, diff --git a/dfetch/project/__init__.py b/dfetch/project/__init__.py index 4ce142fe1..9e8fb53ea 100644 --- a/dfetch/project/__init__.py +++ b/dfetch/project/__init__.py @@ -7,37 +7,38 @@ from dfetch.log import get_logger from dfetch.manifest.manifest import Manifest from dfetch.manifest.parse import find_manifest -from dfetch.project.archivesubproject import ArchiveSubProject -from dfetch.project.gitsubproject import GitSubProject +from dfetch.project.archivesubproject import ArchiveFetcher +from dfetch.project.gitsubproject import GitFetcher from dfetch.project.gitsuperproject import GitSuperProject from dfetch.project.subproject import SubProject from dfetch.project.superproject import NoVcsSuperProject, SuperProject -from dfetch.project.svnsubproject import SvnSubProject +from dfetch.project.svnsubproject import SvnFetcher from dfetch.project.svnsuperproject import SvnSuperProject from dfetch.util.util import resolve_absolute_path -SUPPORTED_SUBPROJECT_TYPES: list[ - type[ArchiveSubProject] | type[GitSubProject] | type[SvnSubProject] -] = [ArchiveSubProject, GitSubProject, SvnSubProject] +_AnyFetcherType = type[ArchiveFetcher] | type[GitFetcher] | type[SvnFetcher] +SUPPORTED_FETCHERS: list[_AnyFetcherType] = [ArchiveFetcher, GitFetcher, SvnFetcher] SUPPORTED_SUPERPROJECT_TYPES = [GitSuperProject, SvnSuperProject] +# Backward-compatible alias used by environment.py and any external callers. +SUPPORTED_SUBPROJECT_TYPES = SUPPORTED_FETCHERS + logger = get_logger(__name__) def create_sub_project( project_entry: dfetch.manifest.project.ProjectEntry, ) -> SubProject: - """Create a new SubProject based on a project from the manifest.""" - for project_type in SUPPORTED_SUBPROJECT_TYPES: - if project_type.NAME == project_entry.vcs: - return project_type(project_entry) + """Create a SubProject by selecting the appropriate fetcher for *project_entry*.""" + for fetcher_type in SUPPORTED_FETCHERS: + if fetcher_type.NAME == project_entry.vcs: + return SubProject(project_entry, fetcher_type(project_entry.remote_url)) - for project_type in SUPPORTED_SUBPROJECT_TYPES: - project = project_type(project_entry) + for fetcher_type in SUPPORTED_FETCHERS: + if fetcher_type.handles(project_entry.remote_url): + return SubProject(project_entry, fetcher_type(project_entry.remote_url)) - if project.check(): - return project - raise RuntimeError("vcs type unsupported") + raise RuntimeError(f"vcs type unsupported for {project_entry.remote_url}") def create_super_project() -> SuperProject: diff --git a/dfetch/project/archivesubproject.py b/dfetch/project/archivesubproject.py index 86336214c..630825dee 100644 --- a/dfetch/project/archivesubproject.py +++ b/dfetch/project/archivesubproject.py @@ -1,22 +1,17 @@ -"""Archive (tar/zip) specific implementation. +"""Archive (tar/zip) fetcher implementation. -Archives are a third VCS type alongside ``git`` and ``svn``. They represent -versioned dependencies that are distributed as ``.tar.gz``, ``.tgz``, -``.tar.bz2``, ``.tar.xz`` or ``.zip`` files reachable via ``http://``, -``https://``, or ``file://`` URLs. +Archives are a retrieval strategy alongside git and svn. They represent +dependencies distributed as ``.tar.gz``, ``.tgz``, ``.tar.bz2``, +``.tar.xz``, or ``.zip`` files reachable via ``http://``, ``https://``, +or ``file://`` URLs. -Unlike git and SVN, archives have no inherent "branching" or "tagging" -concept. Version identity is expressed through: +Unlike VCS sources, archives have no branching or tagging concept. Version +identity is expressed through: -* **No hash** - the URL itself acts as the identity. The archive is - considered up-to-date as long as the same URL is still reachable. -* **``integrity.hash: :``** - the cryptographic hash of the - archive file acts as the version identifier. The fetch step verifies the - downloaded archive against this hash and raises an error on mismatch. +* **No hash** — the URL itself acts as the identity. +* **``integrity.hash: :``** — the cryptographic hash of the + archive file acts as the version identifier. -The ``integrity:`` block is designed for future extension: ``sig:`` and -``sig-key:`` fields for detached signature / signing-key verification will -slot in alongside ``hash:`` without breaking existing manifests. Supported hash algorithms: ``sha256``, ``sha384``, and ``sha512``. Example manifest entries:: @@ -42,12 +37,12 @@ from __future__ import annotations import pathlib +from collections.abc import Sequence from dfetch.log import get_logger from dfetch.manifest.project import ProjectEntry from dfetch.manifest.version import Version from dfetch.project.metadata import Dependency -from dfetch.project.subproject import SubProject from dfetch.util.util import temp_file from dfetch.vcs.archive import ( ARCHIVE_EXTENSIONS, @@ -69,167 +64,112 @@ def _suffix_for_url(url: str) -> str: return ".archive" -class ArchiveSubProject(SubProject): - """A project fetched from a tar/zip archive URL. +class ArchiveFetcher: + """Fetcher for tar/zip archive URLs. - Supports ``src:`` (sub-path extraction), ``ignore:`` (file exclusion) and - ``patch:`` (local patches applied after every fetch) in the same way as - the git and SVN implementations. + Archives are identified by URL or cryptographic hash — not by VCS concepts + such as branches or revisions. This fetcher implements only the common + :class:`~dfetch.project.fetcher.Fetcher` protocol. """ - NAME = "archive" + NAME: str = "archive" - def __init__(self, project: ProjectEntry) -> None: - """Create an ArchiveSubProject.""" - super().__init__(project) - self._project_entry = project - self._remote_repo = ArchiveRemote(project.remote_url) + def __init__(self, remote: str) -> None: + """Create an ArchiveFetcher for *remote*.""" + self._remote = remote + self._remote_repo = ArchiveRemote(remote) - def check(self) -> bool: - """Return *True* when the project URL looks like an archive.""" - return is_archive_url(self.remote) + @classmethod + def handles(cls, remote: str) -> bool: + """Return True when *remote* looks like an archive URL.""" + return is_archive_url(remote) - @staticmethod - def revision_is_enough() -> bool: - """Archives are uniquely identified by their hash (or URL), so yes.""" - return True - - @staticmethod - def list_tool_info() -> None: - """No external tool info to report; archive fetching uses Python stdlib only.""" - - def get_default_branch(self) -> str: - """Archives have no branches; return an empty string.""" - return "" - - def _latest_revision_on_branch(self, branch: str) -> str: - """For archives the 'latest revision' is always the URL (or hash).""" - del branch - return self.remote - - def _download_and_compute_hash( - self, algorithm: str = "sha256", url: str | None = None - ) -> IntegrityHash: - """Download the archive to a temporary file and return its :class:`IntegrityHash`. - - The hash is computed during the download stream — no extra file read. - The temporary file is always cleaned up, even on error. - - Args: - algorithm: Hash algorithm to use (``sha256``, ``sha384``, ``sha512``). - url: If given, download from this URL instead of ``self._remote_repo``. - Use this to pin to the exact URL stored in the on-disk revision. - - Raises: - RuntimeError: On download failure or unsupported algorithm. - """ - effective_url = url if url is not None else self.remote - remote = ArchiveRemote(effective_url) if url is not None else self._remote_repo - with temp_file(_suffix_for_url(effective_url)) as tmp_path: - hex_digest = remote.download(tmp_path, algorithm=algorithm) - return IntegrityHash(algorithm, hex_digest) - - def _does_revision_exist(self, revision: str) -> bool: # noqa: ARG002 - """Check whether the archive URL is still reachable. - - A lightweight HEAD (or partial-GET) reachability check is used for - all revision types, including hash-pinned ones. Full content-integrity - verification is intentionally deferred to fetch time (``_fetch_impl``), - keeping ``dfetch check`` fast even for large archives over slow links. - """ - return self._remote_repo.is_accessible() - - def _list_of_tags(self) -> list[str]: - """Archives have no tags; returns an empty list.""" - return [] - - @property - def wanted_version(self) -> Version: - """Version derived from the ``integrity.hash`` field or the archive URL. + def wanted_version(self, project_entry: ProjectEntry) -> Version: + """Version derived from ``integrity.hash`` or the archive URL. - * With ``integrity.hash: :`` → ``Version(revision=':')`` + * With hash → ``Version(revision=':')`` * Without hash → ``Version(revision=)`` - - This makes the standard :class:`~dfetch.project.subproject.SubProject` - comparison machinery work transparently for archives. """ - if self._project_entry.hash: - return Version(revision=self._project_entry.hash) - return Version(revision=self.remote) - - def _fetch_impl(self, version: Version) -> tuple[Version, list[Dependency]]: - """Download and extract the archive to the local destination. - - 1. Download the archive to a temporary file. - 2. If ``integrity.hash`` is specified, verify the downloaded file. - 3. Extract to :attr:`local_path`, respecting ``src:`` and ``ignore:``. + if project_entry.hash: + return Version(revision=project_entry.hash) + return Version(revision=self._remote) + + def latest_available_version(self, wanted: Version) -> Version | None: + """Return *wanted* if the archive URL is still reachable, else None.""" + return wanted if self._remote_repo.is_accessible() else None + + def fetch( + self, + version: Version, + local_path: str, + name: str, + source: str, + ignore: Sequence[str], + ) -> tuple[Version, list[Dependency]]: + """Download and extract the archive to *local_path*. Raises: RuntimeError: On download failure or hash mismatch. - - Returns: - The version that was actually fetched (hash string or URL). """ - revision = version.revision - - pathlib.Path(self.local_path).mkdir(parents=True, exist_ok=True) - - with temp_file(_suffix_for_url(self.remote)) as tmp_path: - expected = IntegrityHash.parse(revision) - if expected: - actual_hex = self._remote_repo.download( - tmp_path, algorithm=expected.algorithm - ) - if not expected.matches(actual_hex): - raise RuntimeError( - f"Hash mismatch for {self._project_entry.name}! " - f"{expected.algorithm} expected {expected.hex_digest}" - ) - else: - self._remote_repo.download(tmp_path) + pathlib.Path(local_path).mkdir(parents=True, exist_ok=True) + with temp_file(_suffix_for_url(self._remote)) as tmp_path: + self._download_and_verify(version.revision, tmp_path, name) ArchiveLocalRepo.extract( tmp_path, - self.local_path, - src=self.source, - ignore=self.ignore, + local_path, + src=source, + ignore=ignore, ) return version, [] - def freeze_project(self, project: ProjectEntry) -> str | None: - """Pin *project* to a cryptographic hash of the archive. + def _download_and_verify(self, revision: str, tmp_path: str, name: str) -> None: + expected = IntegrityHash.parse(revision) + if expected: + actual_hex = self._remote_repo.download( + tmp_path, algorithm=expected.algorithm + ) + if not expected.matches(actual_hex): + raise RuntimeError( + f"Hash mismatch for {name}! " + f"{expected.algorithm} expected {expected.hex_digest}" + ) + else: + self._remote_repo.download(tmp_path) - * If the archive was already fetched with a hash, the on-disk revision - (``sha256:``) is written to ``integrity.hash`` in the manifest. - * If the archive was fetched without a hash (URL-only), the archive is - downloaded again, its SHA-256 is computed, and the result is written - to ``integrity.hash``. This ensures the manifest always ends up - pinned to a specific content fingerprint. SHA-256 is used as the - default algorithm when no prior hash is present. + def freeze( + self, project: ProjectEntry, on_disk_version: Version | None + ) -> str | None: + """Pin *project* to a cryptographic hash of the archive. - Returns: - The ``:`` string written to *project*, or *None* if - the manifest was already up-to-date. + If already hash-pinned, the on-disk hash is reused. Otherwise the + archive is downloaded and its SHA-256 is computed. Raises: - RuntimeError: On download or hash-computation failure so the caller - can log a meaningful error rather than silently claiming the - project is already pinned. + RuntimeError: On download or hash-computation failure. """ - on_disk = self.on_disk_version() - if not on_disk: + if not on_disk_version: return None - - revision = on_disk.revision - - # Already hash-pinned — use the on-disk revision directly. - # Otherwise download from the revision URL (not the possibly-updated manifest URL). - pinned = IntegrityHash.parse(revision) or self._download_and_compute_hash( - "sha256", url=revision + pinned = IntegrityHash.parse(on_disk_version.revision) or ( + self._download_and_compute_hash("sha256", url=on_disk_version.revision) ) new_hash = str(pinned) if project.hash == new_hash: return None project.hash = new_hash return new_hash + + def _download_and_compute_hash( + self, algorithm: str = "sha256", url: str | None = None + ) -> IntegrityHash: + """Download the archive and return its :class:`IntegrityHash`.""" + effective_url = url if url is not None else self._remote + remote = ArchiveRemote(effective_url) if url is not None else self._remote_repo + with temp_file(_suffix_for_url(effective_url)) as tmp_path: + hex_digest = remote.download(tmp_path, algorithm=algorithm) + return IntegrityHash(algorithm, hex_digest) + + @staticmethod + def list_tool_info() -> None: + """No external tool required; archive fetching uses Python stdlib only.""" diff --git a/dfetch/project/fetcher.py b/dfetch/project/fetcher.py new file mode 100644 index 000000000..552cafb5d --- /dev/null +++ b/dfetch/project/fetcher.py @@ -0,0 +1,176 @@ +"""Fetcher protocols and shared VCS base for subproject composition.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Callable, Sequence +from contextlib import AbstractContextManager +from typing import Protocol, runtime_checkable + +from dfetch.manifest.project import ProjectEntry +from dfetch.manifest.version import Version +from dfetch.project.metadata import Dependency +from dfetch.util.versions import latest_tag_from_list +from dfetch.vcs.patch import PatchType + + +@runtime_checkable +class Fetcher(Protocol): + """How a dependency is retrieved from its remote. + + Implemented by all retrieval strategies: git, svn, and archive. + """ + + NAME: str + + @classmethod + def handles(cls, remote: str) -> bool: + """Return True when this fetcher can handle the given remote URL.""" + raise NotImplementedError + + def fetch( + self, + version: Version, + local_path: str, + name: str, + source: str, + ignore: Sequence[str], + ) -> tuple[Version, list[Dependency]]: + """Retrieve *version* and place it at *local_path*.""" + raise NotImplementedError + + def wanted_version(self, project_entry: ProjectEntry) -> Version: + """Derive the desired version from the manifest entry.""" + raise NotImplementedError + + def freeze( + self, project: ProjectEntry, on_disk_version: Version | None + ) -> str | None: + """Pin *project* to *on_disk_version*; return pinned string or None.""" + + def latest_available_version(self, wanted: Version) -> Version | None: + """Return the latest version matching *wanted*, or None if unavailable.""" + + @staticmethod + def list_tool_info() -> None: + """Print the installed VCS tool version to the report log.""" + + +@runtime_checkable +class VcsFetcher(Fetcher, Protocol): + """Fetcher with full VCS semantics: branches, tags, revision uniqueness.""" + + def revision_is_enough(self) -> bool: + """Return True when a revision alone uniquely identifies a version.""" + raise NotImplementedError + + def get_default_branch(self) -> str: + """Return the default branch name for this repository.""" + raise NotImplementedError + + def list_of_tags(self) -> list[str]: + """Return all available tags.""" + raise NotImplementedError + + def list_of_branches(self) -> list[str]: + """Return all available branches.""" + raise NotImplementedError + + def latest_revision_on_branch(self, branch: str) -> str: + """Return the latest revision on *branch*.""" + raise NotImplementedError + + def does_revision_exist(self, revision: str) -> bool: + """Return True if *revision* exists on the remote.""" + raise NotImplementedError + + def browse_tree( + self, version: str + ) -> AbstractContextManager[Callable[[str], list[tuple[str, bool]]]]: + """Return a context manager yielding a directory-listing callable.""" + raise NotImplementedError + + def patch_type(self) -> PatchType: + """Return the patch format used by this VCS.""" + raise NotImplementedError + + +class AbstractVcsFetcher(ABC): + """Shared implementation for VCS-backed fetchers (git and svn). + + Concrete subclasses must implement the abstract leaf methods. + ``latest_available_version`` and ``freeze`` are implemented here to avoid + duplication between git and svn. + """ + + @abstractmethod + def revision_is_enough(self) -> bool: + """Return True when a revision alone uniquely identifies a version.""" + + @abstractmethod + def get_default_branch(self) -> str: + """Return the default branch name.""" + + @abstractmethod + def list_of_tags(self) -> list[str]: + """Return all available tags.""" + + @abstractmethod + def does_revision_exist(self, revision: str) -> bool: + """Return True if *revision* exists on the remote.""" + + @abstractmethod + def latest_revision_on_branch(self, branch: str) -> str: + """Return the latest revision on *branch*.""" + + def latest_available_version(self, wanted: Version) -> Version | None: + """Return the latest version matching *wanted*, or None if unavailable.""" + if wanted.tag: + return self._latest_tag_version(wanted.tag) + if self._is_revision_only(wanted): + return self._revision_version_if_exists(wanted.revision) + branch = self._resolve_branch(wanted.branch) + revision = self.latest_revision_on_branch(branch) + return Version(revision=revision, branch=branch) if revision else None + + def _is_revision_only(self, wanted: Version) -> bool: + return not wanted.branch and bool(wanted.revision) and self.revision_is_enough() + + def _resolve_branch(self, wanted_branch: str) -> str: + if wanted_branch == " ": + return "" + return wanted_branch or self.get_default_branch() + + def _latest_tag_version(self, tag: str) -> Version | None: + tags = self.list_of_tags() + if tag not in tags: + return None + return Version(tag=latest_tag_from_list(tag, tags)) + + def _revision_version_if_exists(self, revision: str) -> Version | None: + return ( + Version(revision=revision) if self.does_revision_exist(revision) else None + ) + + def freeze( + self, project: ProjectEntry, on_disk_version: Version | None + ) -> str | None: + """Pin *project* to the on-disk version; return pinned string or None.""" + if not on_disk_version: + return None + if self._is_already_pinned(project, on_disk_version): + return None + project.version = on_disk_version + return on_disk_version.revision or on_disk_version.tag or str(on_disk_version) + + def _is_already_pinned( + self, project: ProjectEntry, on_disk_version: Version + ) -> bool: + if project.version.tag: + return project.version.tag == on_disk_version.tag + if not project.version.revision or not on_disk_version.revision: + return False + return ( + project.version.revision == on_disk_version.revision + and self.revision_is_enough() + ) diff --git a/dfetch/project/gitsubproject.py b/dfetch/project/gitsubproject.py index c66640ac0..5b96389e2 100644 --- a/dfetch/project/gitsubproject.py +++ b/dfetch/project/gitsubproject.py @@ -1,127 +1,141 @@ -"""Git specific implementation.""" +"""Git fetcher implementation.""" import pathlib -from functools import lru_cache +from collections.abc import Callable, Sequence +from contextlib import AbstractContextManager from dfetch.log import get_logger from dfetch.manifest.project import ProjectEntry from dfetch.manifest.version import Version +from dfetch.project.fetcher import AbstractVcsFetcher from dfetch.project.metadata import Dependency -from dfetch.project.subproject import SubProject from dfetch.util.license import LICENSE_GLOBS from dfetch.util.util import safe_rm from dfetch.vcs.git import CheckoutOptions, GitLocalRepo, GitRemote +from dfetch.vcs.git_types import Submodule +from dfetch.vcs.patch import PatchType logger = get_logger(__name__) -class GitSubProject(SubProject): - """A git subproject.""" +class GitFetcher(AbstractVcsFetcher): + """Fetcher for git repositories.""" - NAME = "git" + NAME: str = "git" - def __init__(self, project: ProjectEntry): - """Create a Git subproject.""" - super().__init__(project) - self._remote_repo = GitRemote(self.remote) + def __init__(self, remote: str) -> None: + """Create a GitFetcher for *remote*.""" + self._remote = remote + self._remote_repo = GitRemote(remote) + self._default_branch: str | None = None - @property - def remote_repo(self) -> GitRemote: - """Return the underlying remote repository object.""" - return self._remote_repo + @classmethod + def handles(cls, remote: str) -> bool: + """Return True when *remote* is a git repository.""" + return bool(GitRemote(remote).is_git()) - def check(self) -> bool: - """Check if is GIT.""" - return bool(self._remote_repo.is_git()) + def revision_is_enough(self) -> bool: + """Git SHAs are globally unique; revision alone identifies a commit.""" + return True - def _latest_revision_on_branch(self, branch: str) -> str: - """Get the latest revision on a branch.""" - return str(self._remote_repo.last_sha_on_branch(branch)) + def get_default_branch(self) -> str: + """Return the default branch of this repository (cached after first network fetch).""" + if self._default_branch is None: + self._default_branch = self._remote_repo.get_default_branch() + return self._default_branch - def _does_revision_exist(self, revision: str) -> bool: - """Check if the given revision exists.""" - return self._remote_repo.check_version_exists(revision) - - def _list_of_tags(self) -> list[str]: - """Get list of all available tags.""" + def list_of_tags(self) -> list[str]: + """Return all tags.""" return [str(tag) for tag in self._remote_repo.list_of_tags()] def list_of_branches(self) -> list[str]: - """Get list of all available branches.""" + """Return all branches.""" return [str(branch) for branch in self._remote_repo.list_of_branches()] - @staticmethod - def revision_is_enough() -> bool: - """See if this VCS can uniquely distinguish branch with revision only.""" - return True + def latest_revision_on_branch(self, branch: str) -> str: + """Return the latest commit SHA on *branch*.""" + return str(self._remote_repo.last_sha_on_branch(branch)) - @staticmethod - def list_tool_info() -> None: - """Print out version information.""" - try: - tool, version = GitLocalRepo.get_tool_version() - SubProject._log_tool(tool, version) - except RuntimeError as exc: - logger.debug( - f"Something went wrong trying to get the version of git: {exc}" - ) - SubProject._log_tool("git", "") + def does_revision_exist(self, revision: str) -> bool: + """Return True if *revision* exists on the remote.""" + return self._remote_repo.check_version_exists(revision) + + def browse_tree( + self, version: str + ) -> AbstractContextManager[Callable[[str], list[tuple[str, bool]]]]: + """Return a context manager yielding a directory-listing callable.""" + return self._remote_repo.browse_tree(version) + + def patch_type(self) -> PatchType: + """Return git patch format.""" + return PatchType.GIT - def _fetch_impl(self, version: Version) -> tuple[Version, list[Dependency]]: - """Get the revision of the remote and place it at the local path.""" + def wanted_version(self, project_entry: ProjectEntry) -> Version: + """Derive the desired version from the manifest entry.""" + return Version( + branch=project_entry.branch, + tag=project_entry.tag, + revision=project_entry.revision, + ) + + def fetch( + self, + version: Version, + local_path: str, + name: str, + source: str, + ignore: Sequence[str], + ) -> tuple[Version, list[Dependency]]: + """Checkout *version* from git and place it at *local_path*.""" rev_or_branch_or_tag = self._determine_what_to_fetch(version) - # When exporting a file, the destination directory must already exist - pathlib.Path(self.local_path).mkdir(parents=True, exist_ok=True) + pathlib.Path(local_path).mkdir(parents=True, exist_ok=True) - license_globs = [f"/{name.lower()}" for name in LICENSE_GLOBS] + [ - f"/{name.upper()}" for name in LICENSE_GLOBS + license_globs = [f"/{n.lower()}" for n in LICENSE_GLOBS] + [ + f"/{n.upper()}" for n in LICENSE_GLOBS ] - local_repo = GitLocalRepo(self.local_path) + local_repo = GitLocalRepo(local_path) fetched_sha, submodules = local_repo.checkout_version( CheckoutOptions( - remote=self.remote, + remote=self._remote, version=rev_or_branch_or_tag, - src=self.source, + src=source, must_keeps=license_globs + [".gitmodules"], - ignore=self.ignore, + ignore=ignore, ) ) - vcs_deps = [] - for submodule in submodules: - self._log_project( - f'Found & fetched submodule "./{submodule.path}" ' - f" ({submodule.url} @ {Version(tag=submodule.tag, branch=submodule.branch, revision=submodule.sha)})", - ) - vcs_deps.append( - Dependency( - remote_url=submodule.url, - destination=submodule.path, - branch=submodule.branch, - tag=submodule.tag, - revision=submodule.sha, - source_type="git-submodule", - ) - ) + vcs_deps = [self._submodule_dependency(sub, name) for sub in submodules] targets = {local_repo.METADATA_DIR, local_repo.GIT_MODULES_FILE} - - for path in pathlib.Path(self.local_path).rglob("*"): + for path in pathlib.Path(local_path).rglob("*"): if path.name in targets: safe_rm(path) return self._determine_fetched_version(version, fetched_sha), vcs_deps + def _submodule_dependency(self, submodule: Submodule, name: str) -> Dependency: + logger.print_info_line( + name, + f'Found & fetched submodule "./{submodule.path}" ' + f" ({submodule.url} @ {Version(tag=submodule.tag, branch=submodule.branch, revision=submodule.sha)})", + ) + return Dependency( + remote_url=submodule.url, + destination=submodule.path, + branch=submodule.branch, + tag=submodule.tag, + revision=submodule.sha, + source_type="git-submodule", + ) + def _determine_what_to_fetch(self, version: Version) -> str: - """Based on asked version, target to fetch.""" if version.revision and 0 < len(version.revision) < 40: raise RuntimeError( "Shortened revisions (SHA) in manifests cannot be used," " use complete revision or a branch (or tags instead)" ) - return ( version.revision or version.tag @@ -130,17 +144,19 @@ def _determine_what_to_fetch(self, version: Version) -> str: ) def _determine_fetched_version(self, version: Version, fetched_sha: str) -> Version: - """Based on asked & fetched version, determine info of fetched version.""" branch = version.branch or self.get_default_branch() if version.tag: return Version(tag=version.tag, branch=branch) + return Version(branch=branch, revision=version.revision or fetched_sha) - return Version( - branch=branch, - revision=version.revision or fetched_sha, - ) - - @lru_cache - def get_default_branch(self) -> str: # type: ignore - """Get the default branch of this repository.""" - return self._remote_repo.get_default_branch() + @staticmethod + def list_tool_info() -> None: + """Print the installed git version.""" + try: + tool, version = GitLocalRepo.get_tool_version() + get_logger(__name__).print_report_line(tool, version.strip()) + except RuntimeError as exc: + logger.debug( + f"Something went wrong trying to get the version of git: {exc}" + ) + get_logger(__name__).print_report_line("git", "") diff --git a/dfetch/project/gitsuperproject.py b/dfetch/project/gitsuperproject.py index b46c439ac..e4ccd739c 100644 --- a/dfetch/project/gitsuperproject.py +++ b/dfetch/project/gitsuperproject.py @@ -13,7 +13,7 @@ from dfetch.log import get_logger from dfetch.manifest.manifest import Manifest from dfetch.manifest.project import ProjectEntry -from dfetch.project.gitsubproject import GitSubProject +from dfetch.project.gitsubproject import GitFetcher from dfetch.project.subproject import SubProject from dfetch.project.superproject import RevisionRange, SuperProject from dfetch.util.util import check_no_path_traversal, resolve_absolute_path @@ -37,7 +37,7 @@ def check(path: str | pathlib.Path) -> bool: def get_sub_project(self, project: ProjectEntry) -> SubProject | None: """Get the subproject in the same vcs type as the superproject.""" - return GitSubProject(project) + return SubProject(project, GitFetcher(project.remote_url)) def ignored_files(self, path: str) -> Sequence[str]: """Return a list of files that can be ignored in a given path.""" diff --git a/dfetch/project/subproject.py b/dfetch/project/subproject.py index dbc4f6bea..802a5ff6d 100644 --- a/dfetch/project/subproject.py +++ b/dfetch/project/subproject.py @@ -1,118 +1,180 @@ -"""SubProject.""" +"""SubProject — the domain aggregate for a vendored dependency.""" import os import pathlib -from abc import ABC, abstractmethod from collections.abc import Callable, Sequence from dfetch.log import get_logger from dfetch.manifest.project import ProjectEntry from dfetch.manifest.version import Version from dfetch.project.abstract_check_reporter import AbstractCheckReporter -from dfetch.project.metadata import Dependency, InvalidMetadataError, Metadata +from dfetch.project.fetcher import Fetcher, VcsFetcher +from dfetch.project.metadata import InvalidMetadataError, Metadata from dfetch.util.util import hash_directory, safe_rm -from dfetch.util.versions import latest_tag_from_list from dfetch.vcs.patch import Patch logger = get_logger(__name__) -class SubProject(ABC): # pylint: disable=too-many-public-methods - """Abstract SubProject object. +class SubProject: + """A vendored dependency declared in the manifest. - This object represents one Project entry in the Manifest. - It can be updated. + Orchestrates the update lifecycle (fetch, patch, metadata persistence) + and delegates all VCS- or archive-specific behaviour to a :class:`Fetcher`. """ - NAME = "" - - def __init__(self, project: ProjectEntry) -> None: - """Create the subproject.""" + def __init__(self, project: ProjectEntry, fetcher: Fetcher) -> None: + """Create a SubProject backed by *fetcher*.""" self.__project = project - self.__metadata = Metadata.from_project_entry(self.__project) - + self.__fetcher = fetcher + self.__metadata = Metadata.from_project_entry(project) self._show_animations = not self._running_in_ci() - @staticmethod - def _running_in_ci() -> bool: - """Are we running in CI.""" - ci_env_var = os.getenv("CI", "") - return bool(ci_env_var) and ci_env_var[0].lower() in ("t", "1", "y") + # ------------------------------------------------------------------ + # VCS dispatch + # ------------------------------------------------------------------ + + def as_vcs(self) -> VcsFetcher | None: + """Return the fetcher cast to VcsFetcher, or None for archive.""" + return self.__fetcher if isinstance(self.__fetcher, VcsFetcher) else None + + # ------------------------------------------------------------------ + # Thin delegates exposed to command layer + # ------------------------------------------------------------------ + + def list_of_branches(self) -> list[str]: + """Return all branches, or an empty list for archive dependencies.""" + vcs = self.as_vcs() + return vcs.list_of_branches() if vcs else [] + + def list_of_tags(self) -> list[str]: + """Return all tags, or an empty list for archive dependencies.""" + vcs = self.as_vcs() + return vcs.list_of_tags() if vcs else [] + + def get_default_branch(self) -> str: + """Return the default branch, or an empty string for archive dependencies.""" + vcs = self.as_vcs() + return vcs.get_default_branch() if vcs else "" + + # ------------------------------------------------------------------ + # Core properties + # ------------------------------------------------------------------ + + @property + def name(self) -> str: + """Project name.""" + return self.__project.name + + @property + def local_path(self) -> str: + """Local destination path.""" + return self.__project.destination + + @property + def wanted_version(self) -> Version: + """Desired version as expressed in the manifest.""" + return self.__fetcher.wanted_version(self.__project) + + @property + def metadata_path(self) -> str: + """Path to the on-disk metadata file.""" + return self.__metadata.path + + @property + def remote(self) -> str: + """Remote URL.""" + return self.__metadata.remote_url + + @property + def source(self) -> str: + """Source sub-path within the remote.""" + return self.__project.source + + @property + def ignore(self) -> Sequence[str]: + """Files/folders to exclude after fetch.""" + return self.__project.ignore + + @property + def patch(self) -> Sequence[str]: + """Patch files to apply after fetch.""" + return self.__project.patch + + # ------------------------------------------------------------------ + # Version resolution + # ------------------------------------------------------------------ def check_wanted_with_local(self) -> tuple[Version | None, Version | None]: - """Given the project entry in the manifest, get the relevant version from disk. + """Return (wanted, have) version pair for the current manifest entry. - Returns: - Tuple[Optional[Version], Optional[Version]]: Wanted, Have + For archive dependencies, identity is the revision field (URL or hash). + For VCS dependencies, branch and revision semantics apply. """ on_disk = self.on_disk_version() - if not on_disk: return (self.wanted_version, None) + vcs = self.as_vcs() + if vcs is not None: + return self._resolve_vcs_versions(vcs, on_disk) + return ( + Version(revision=self.wanted_version.revision), + Version(revision=on_disk.revision), + ) + def _resolve_vcs_versions( + self, vcs: VcsFetcher, on_disk: Version + ) -> tuple[Version, Version]: + """Return (wanted, have) using VCS branch/revision semantics.""" if self.wanted_version.tag: - return (Version(tag=self.wanted_version.tag), Version(tag=on_disk.tag)) - + return ( + Version(tag=self.wanted_version.tag), + Version(tag=on_disk.tag), + ) wanted_branch, on_disk_branch = "", "" - if not (self.wanted_version.revision and self.revision_is_enough()): - wanted_branch = self.wanted_version.branch or self.get_default_branch() + if not (self.wanted_version.revision and vcs.revision_is_enough()): + wanted_branch = self.wanted_version.branch or vcs.get_default_branch() on_disk_branch = on_disk.branch - - wanted_revision = ( - self.wanted_version.revision - or self._latest_revision_on_branch(wanted_branch) + wanted_revision = self.wanted_version.revision or vcs.latest_revision_on_branch( + wanted_branch ) - return ( - Version( - revision=wanted_revision, - branch=wanted_branch, - ), + Version(revision=wanted_revision, branch=wanted_branch), Version(revision=on_disk.revision, branch=on_disk_branch), ) def update_is_required(self, force: bool = False) -> Version | None: - """Check if this project should be upgraded. - - Args: - force (bool, optional): Ignore if versions match. - Defaults to False. - """ + """Return the version to fetch, or None when already up-to-date.""" wanted, current = self.check_wanted_with_local() - if not force and wanted == current: self._log_project(f"up-to-date ({current})") return None - logger.debug(f"{self.__project.name} Current ({current}), Available ({wanted})") return wanted + # ------------------------------------------------------------------ + # Update lifecycle + # ------------------------------------------------------------------ + def update( self, force: bool = False, ignored_files_callback: Callable[[], Sequence[str]] | None = None, patch_count: int = -1, ) -> None: - """Update this subproject if required. + """Fetch and install this subproject if an update is required. Args: - force (bool, optional): Ignore if version is ok or any local changes were done. - Defaults to False. - ignored_files_callback (Callable, optional): Called to obtain the set of files - to ignore. Invoked twice: once before clearing the destination (to detect - pre-existing local changes) and once after extraction (to compute the stored - hash). Calling it at both points ensures the stored hash and the check-time - hash use the same skiplist, preventing false "local changes" reports. - patch_count (int, optional): Number of patches to apply (-1 means all). + force: Ignore version match and local-change checks. + ignored_files_callback: Called before and after fetch to obtain + files that should not contribute to the stored hash. + patch_count: Number of patches to apply (-1 means all). """ to_fetch = self.update_is_required(force) - if not to_fetch: return - pre_fetch_ignored = ( - list(ignored_files_callback()) if ignored_files_callback else [] - ) + pre_fetch_ignored = self._collect_ignored(ignored_files_callback) if not force and self._are_there_local_changes(pre_fetch_ignored): self._log_project( @@ -129,14 +191,18 @@ def update( f"Fetching {to_fetch}", enabled=self._show_animations, ): - actually_fetched, dependency = self._fetch_impl(to_fetch) + actually_fetched, dependency = self.__fetcher.fetch( + to_fetch, + self.local_path, + self.__project.name, + self.source, + self.ignore, + ) self._log_project(f"Fetched {actually_fetched}") applied_patches = self._apply_patches(patch_count) - post_fetch_ignored = ( - list(ignored_files_callback()) if ignored_files_callback else [] - ) + post_fetch_ignored = self._collect_ignored(ignored_files_callback) self.__metadata.fetched( actually_fetched, @@ -151,69 +217,52 @@ def update( logger.debug(f"Writing repo metadata to: {self.__metadata.path}") self.__metadata.dump() + def _collect_ignored( + self, callback: Callable[[], Sequence[str]] | None + ) -> list[str]: + return list(callback()) if callback else [] + def _apply_patches(self, count: int = -1) -> list[str]: - """Apply the patches.""" + """Apply manifest patches; return list of applied patch paths.""" cwd = pathlib.Path(".").resolve() applied_patches = [] count = len(self.__project.patch) if count == -1 else count for patch in self.__project.patch[:count]: - patch_path = (cwd / patch).resolve() - try: relative_patch_path = patch_path.relative_to(cwd) except ValueError: self._log_project(f'Skipping patch "{patch}" which is outside {cwd}.') continue - if not patch_path.exists(): self._log_project(f"Skipping non-existent patch {patch}") continue - - normalized_patch_path = str(relative_patch_path.as_posix()) - - self._log_project(f'Applying patch "{normalized_patch_path}"') - result = Patch.from_file(normalized_patch_path).apply(root=self.local_path) - + normalized = str(relative_patch_path.as_posix()) + self._log_project(f'Applying patch "{normalized}"') + result = Patch.from_file(normalized).apply(root=self.local_path) if result.encoding_warning: self._log_project( - f'After retrying found that patch-file "{normalized_patch_path}" ' + f'After retrying found that patch-file "{normalized}" ' "is not UTF-8 encoded, consider saving it with UTF-8 encoding." ) - - applied_patches.append(normalized_patch_path) + applied_patches.append(normalized) return applied_patches - def _report_unavailable_version( - self, reporters: Sequence[AbstractCheckReporter] - ) -> None: - """Report that the wanted version is not available on the remote.""" - for reporter in reporters: - reporter.unavailable_project_version(self.__project, self.wanted_version) - - def _report_unfetched_project( - self, reporters: Sequence[AbstractCheckReporter], latest_version: Version - ) -> None: - """Report that the project is not fetched yet.""" - for reporter in reporters: - reporter.unfetched_project( - self.__project, self.wanted_version, latest_version - ) - - def _report_local_changes(self, reporters: Sequence[AbstractCheckReporter]) -> None: - """Report that there are local changes.""" - for reporter in reporters: - reporter.local_changes(self.__project) + # ------------------------------------------------------------------ + # Check for updates + # ------------------------------------------------------------------ def check_for_update( self, reporters: Sequence[AbstractCheckReporter], files_to_ignore: Sequence[str] ) -> None: - """Check if there is an update available.""" + """Check whether a newer version is available and report via *reporters*.""" on_disk_version = self.on_disk_version() with logger.status( self.__project.name, "Checking", enabled=self._show_animations ): - latest_version = self._check_for_newer_version() + latest_version = self.__fetcher.latest_available_version( + self.wanted_version + ) if not latest_version: self._report_unavailable_version(reporters) @@ -230,20 +279,19 @@ def check_for_update( latest_version, on_disk_version, reporters ) - def _versions_match( - self, latest_version: Version, on_disk_version: Version - ) -> bool: - """Return True when latest and on-disk versions are considered equal.""" - return (latest_version == on_disk_version) or ( - self.revision_is_enough() - and bool(latest_version.revision) - and latest_version.revision == on_disk_version.revision - ) + def _versions_match(self, latest: Version, on_disk: Version) -> bool: + vcs = self.as_vcs() + if vcs is not None: + return (latest == on_disk) or ( + vcs.revision_is_enough() + and bool(latest.revision) + and latest.revision == on_disk.revision + ) + return latest == on_disk def _select_check_action( self, latest_version: Version, on_disk_version: Version ) -> Callable[[AbstractCheckReporter], None]: - """Return the single reporter callback that matches the version comparison.""" if self._versions_match(latest_version, on_disk_version): return lambda r: r.up_to_date_project(self.__project, latest_version) if on_disk_version == self.wanted_version: @@ -264,88 +312,22 @@ def _check_latest_with_on_disk_version( for reporter in reporters: report(reporter) - def _log_project(self, msg: str) -> None: - logger.print_info_line(self.__project.name, msg) + # ------------------------------------------------------------------ + # Freeze + # ------------------------------------------------------------------ - @staticmethod - def _log_tool(name: str, msg: str) -> None: - logger.print_report_line(name, msg.strip()) - - @property - def name(self) -> str: - """Get the name of this project.""" - return self.__project.name - - @property - def local_path(self) -> str: - """Get the local destination of this project.""" - return self.__project.destination - - @property - def wanted_version(self) -> Version: - """Get the wanted version of this subproject.""" - return self.__metadata.version - - @property - def metadata_path(self) -> str: - """Get the path of the metadata.""" - return self.__metadata.path - - @property - def remote(self) -> str: - """Get the remote URL of this subproject.""" - return self.__metadata.remote_url - - @property - def source(self) -> str: - """Get the source folder of this subproject.""" - return self.__project.source - - @property - def ignore(self) -> Sequence[str]: - """Get the files/folders to ignore of this subproject.""" - return self.__project.ignore - - @property - def patch(self) -> Sequence[str]: - """Get the patches of this project.""" - return self.__project.patch - - @abstractmethod - def check(self) -> bool: - """Check if it can handle the type.""" - - @staticmethod - @abstractmethod - def revision_is_enough() -> bool: - """See if this VCS can uniquely distinguish branch with revision only.""" - - @abstractmethod - def _latest_revision_on_branch(self, branch: str) -> str: - """Get the latest revision on a branch.""" - - @abstractmethod - def _does_revision_exist(self, revision: str) -> bool: - """Check if the given revision exists.""" - - @abstractmethod - def _list_of_tags(self) -> list[str]: - """Get list of all available tags.""" + def freeze_project(self, project: ProjectEntry) -> str | None: + """Pin *project* to its current on-disk version via the fetcher.""" + return self.__fetcher.freeze(project, self.on_disk_version()) - @staticmethod - @abstractmethod - def list_tool_info() -> None: - """Print out version information.""" + # ------------------------------------------------------------------ + # On-disk state + # ------------------------------------------------------------------ def on_disk_version(self) -> Version | None: - """Get the version of the project on disk. - - Returns: - Version: Could be None of no on disk version - """ + """Read the on-disk version from metadata; return None if absent or invalid.""" if not os.path.exists(self.__metadata.path): return None - try: return Metadata.from_file(self.__metadata.path).version except InvalidMetadataError: @@ -357,14 +339,9 @@ def on_disk_version(self) -> Version | None: return None def _on_disk_hash(self) -> str | None: - """Get the hash of the project on disk. - - Returns: - Str: Could be None if no on disk version - """ + """Read the stored directory hash; return None if absent or invalid.""" if not os.path.exists(self.__metadata.path): return None - try: return Metadata.from_file(self.__metadata.path).hash except InvalidMetadataError: @@ -375,106 +352,48 @@ def _on_disk_hash(self) -> str | None: ) return None - def _revision_only_mode(self) -> bool: - """Return True when the wanted version should be resolved by revision alone.""" - return ( - not self.wanted_version.branch - and bool(self.wanted_version.revision) - and self.revision_is_enough() - ) - - def _check_for_newer_version(self) -> Version | None: - """Check if a newer version is available on the given branch. - - In case wanted_version does not exist (anymore) on the remote return None. - """ - if self.wanted_version.tag: - available_tags = self._list_of_tags() - if self.wanted_version.tag not in available_tags: - return None - return Version( - tag=latest_tag_from_list(self.wanted_version.tag, available_tags) - ) - if self.wanted_version.branch == " ": - branch = "" - else: - branch = self.wanted_version.branch or self.get_default_branch() - - if self._revision_only_mode(): - return ( - Version(revision=self.wanted_version.revision) - if self._does_revision_exist(self.wanted_version.revision) - else None - ) - - revision = self._latest_revision_on_branch(branch) - return Version(revision=revision, branch=branch) if revision else None - def _are_there_local_changes(self, files_to_ignore: Sequence[str]) -> bool: - """Check if there are local changes. - - Returns: - Bool: True if there are local changes, false if no were detected or no hash was found. - """ logger.debug(f"Checking if there were local changes in {self.local_path}") on_disk_hash = self._on_disk_hash() - return bool(on_disk_hash) and on_disk_hash != hash_directory( self.local_path, skiplist=[self.__metadata.FILENAME] + list(files_to_ignore), ) - @abstractmethod - def _fetch_impl(self, version: Version) -> tuple[Version, list[Dependency]]: - """Fetch the given version of the subproject, should be implemented by the child class.""" + # ------------------------------------------------------------------ + # Reporters + # ------------------------------------------------------------------ - @abstractmethod - def get_default_branch(self) -> str: - """Get the default branch of this repository.""" + def _report_unavailable_version( + self, reporters: Sequence[AbstractCheckReporter] + ) -> None: + for reporter in reporters: + reporter.unavailable_project_version(self.__project, self.wanted_version) - def list_of_branches(self) -> list[str]: - """Get list of all available branches. Override in VCS-specific subclasses.""" - return [] + def _report_unfetched_project( + self, reporters: Sequence[AbstractCheckReporter], latest_version: Version + ) -> None: + for reporter in reporters: + reporter.unfetched_project( + self.__project, self.wanted_version, latest_version + ) - def list_of_tags(self) -> list[str]: - """Get list of all available tags (public wrapper around ``_list_of_tags``).""" - return self._list_of_tags() + def _report_local_changes(self, reporters: Sequence[AbstractCheckReporter]) -> None: + for reporter in reporters: + reporter.local_changes(self.__project) - def freeze_project(self, project: ProjectEntry) -> str | None: - """Freeze *project* to its current on-disk version. + # ------------------------------------------------------------------ + # Logging helpers + # ------------------------------------------------------------------ - Subclasses may override this to apply VCS-specific freeze logic (e.g. - :class:`~dfetch.project.archivesubproject.ArchiveSubProject` stores - the hash under ``integrity.hash`` rather than ``revision:``). + def _log_project(self, msg: str) -> None: + logger.print_info_line(self.__project.name, msg) - Returns: - The version string that was written to *project* when a change was - made, or *None* if the entry was already pinned to the on-disk - version or no on-disk version could be determined. + @staticmethod + def _log_tool(name: str, msg: str) -> None: + logger.print_report_line(name, msg.strip()) - Raises: - RuntimeError: When VCS-specific freeze logic fails (e.g. archive - download error). Callers should catch and report these. - """ - on_disk_version = self.on_disk_version() - if on_disk_version and self._is_already_pinned(project, on_disk_version): - return None - if on_disk_version: - project.version = on_disk_version - return ( - on_disk_version.revision or on_disk_version.tag or str(on_disk_version) - ) - return None - - def _is_already_pinned( - self, project: ProjectEntry, on_disk_version: Version - ) -> bool: - """Return True if *project* is already pinned to *on_disk_version*.""" - if project.version.tag: - return project.version.tag == on_disk_version.tag - if not project.version.revision or not on_disk_version.revision: - return False - return ( - project.version.revision == on_disk_version.revision - and self.revision_is_enough() - ) + @staticmethod + def _running_in_ci() -> bool: + ci_env_var = os.getenv("CI", "") + return bool(ci_env_var) and ci_env_var[0].lower() in ("t", "1", "y") diff --git a/dfetch/project/svnsubproject.py b/dfetch/project/svnsubproject.py index 3a1cdfdf0..7efe4b02c 100644 --- a/dfetch/project/svnsubproject.py +++ b/dfetch/project/svnsubproject.py @@ -1,164 +1,190 @@ -"""SVN specific implementation.""" +"""SVN fetcher implementation.""" import os import pathlib import urllib.parse +from collections.abc import Callable, Sequence +from contextlib import AbstractContextManager from dfetch.log import get_logger from dfetch.manifest.project import ProjectEntry from dfetch.manifest.version import Version +from dfetch.project.fetcher import AbstractVcsFetcher from dfetch.project.metadata import Dependency -from dfetch.project.subproject import SubProject from dfetch.util.license import is_license_file from dfetch.util.util import ( find_matching_files, find_non_matching_files, safe_rm, ) +from dfetch.vcs.patch import PatchType from dfetch.vcs.svn import SvnRemote, SvnRepo, get_svn_version logger = get_logger(__name__) -class SvnSubProject(SubProject): - """A svn subproject.""" +class SvnFetcher(AbstractVcsFetcher): + """Fetcher for SVN repositories.""" - NAME = "svn" + NAME: str = "svn" - def __init__(self, project: ProjectEntry): - """Create a Svn subproject.""" - super().__init__(project) - self._remote_repo = SvnRemote(self.remote) + def __init__(self, remote: str) -> None: + """Create a SvnFetcher for *remote*.""" + self._remote = remote + self._remote_repo = SvnRemote(remote) - @property - def remote_repo(self) -> SvnRemote: - """Return the underlying remote repository object.""" - return self._remote_repo + @classmethod + def handles(cls, remote: str) -> bool: + """Return True when *remote* is an SVN repository.""" + return SvnRemote(remote).is_svn() - def check(self) -> bool: - """Check if is SVN.""" - return self._remote_repo.is_svn() - - @staticmethod - def revision_is_enough() -> bool: - """See if this VCS can uniquely distinguish branch with revision only.""" + def revision_is_enough(self) -> bool: + """SVN revisions are not branch-unique; revision alone is insufficient.""" return False - def _latest_revision_on_branch(self, branch: str) -> str: - """Get the latest revision on a branch.""" + def get_default_branch(self) -> str: + """Return SVN trunk as the default branch.""" + return SvnRepo.DEFAULT_BRANCH + + def list_of_tags(self) -> list[str]: + """Return all tags.""" + return self._remote_repo.list_of_tags() + + def list_of_branches(self) -> list[str]: + """Return trunk plus branches found under ``branches/``.""" + return [SvnRepo.DEFAULT_BRANCH, *self._remote_repo.list_of_branches()] + + def latest_revision_on_branch(self, branch: str) -> str: + """Return the latest revision number on *branch*.""" if branch not in (SvnRepo.DEFAULT_BRANCH, "", " "): branch = f"branches/{branch}" return self._get_revision(branch) - def _does_revision_exist(self, revision: str) -> bool: - """Check if the given revision exists.""" + def does_revision_exist(self, revision: str) -> bool: + """Not supported for SVN; revision requires a branch context.""" raise NotImplementedError( "In SVN only a revision is NOT enough, this should not be called!" ) - def _list_of_tags(self) -> list[str]: - """Get list of all available tags.""" - return self._remote_repo.list_of_tags() + def latest_available_version(self, wanted: Version) -> Version | None: + """Return the latest version matching *wanted*, or None if unavailable. - @staticmethod - def list_tool_info() -> None: - """Print out version information.""" - try: - tool, version = get_svn_version() - SubProject._log_tool(tool, version) - except RuntimeError as exc: - logger.debug( - f"Something went wrong trying to get the version of svn: {exc}" - ) - SubProject._log_tool("svn", "") + For revision-only pins (no branch, no tag) the pinned revision is + returned with the default branch so version comparison works correctly. + SVN revisions are globally ordered within a repository, so a bare + ``revision:`` in the manifest is always relative to trunk. + """ + if wanted.revision and not wanted.branch and not wanted.tag: + return Version(revision=wanted.revision, branch=self.get_default_branch()) + return super().latest_available_version(wanted) + + def browse_tree( + self, version: str + ) -> AbstractContextManager[Callable[[str], list[tuple[str, bool]]]]: + """Return a context manager yielding a directory-listing callable.""" + return self._remote_repo.browse_tree(version) + + def patch_type(self) -> PatchType: + """Return SVN patch format.""" + return PatchType.SVN + + def wanted_version(self, project_entry: ProjectEntry) -> Version: + """Derive the desired version from the manifest entry.""" + return Version( + branch=project_entry.branch, + tag=project_entry.tag, + revision=project_entry.revision, + ) - def _determine_what_to_fetch(self, version: Version) -> tuple[str, str, str]: - """Based on the given version, determine what to fetch. + def fetch( + self, + version: Version, + local_path: str, + name: str, + source: str, + ignore: Sequence[str], + ) -> tuple[Version, list[Dependency]]: + """Export *version* from SVN and place it at *local_path*.""" + logger.debug("Fetching SVN dependency: %s", name) + branch, branch_path, revision = self._determine_what_to_fetch(version) - Args: - version (Version): Version that needs to be fetched + complete_path = "/".join( + filter(None, [self._remote, branch_path.strip(), source]) + ).strip("/") - Raises: - RuntimeError: Invalid revision + pathlib.Path(os.path.dirname(local_path)).mkdir(parents=True, exist_ok=True) - Returns: - Tuple[str, str, str]: branch, branch_path, revision - """ - if version.tag: - branch_path = f"tags/{version.tag}/" - branch = "" - elif version.branch == " ": - branch_path = "" - branch = " " - else: - branch = version.branch or SvnRepo.DEFAULT_BRANCH - branch_path = ( - f"branches/{branch}" - if branch != SvnRepo.DEFAULT_BRANCH - else SvnRepo.DEFAULT_BRANCH - ) + complete_path, file_pattern = self._parse_file_pattern(complete_path) - branch_path = urllib.parse.quote(branch_path) + SvnRepo.export(complete_path, revision, local_path) - revision = version.revision or self._get_revision(branch_path) + if file_pattern: + self._apply_file_pattern(local_path, file_pattern, source) - if not revision.isdigit(): - raise RuntimeError(f"{revision} must be a number for SVN") + if source: + self._copy_license_files(local_path, branch_path, revision) - return (branch, branch_path, revision) + if ignore: + self._remove_ignored_files(local_path, ignore) - def _remove_ignored_files(self) -> None: - """Remove any ignored files, whilst keeping license files.""" - for file_or_dir in find_matching_files(self.local_path, self.ignore): - if not (file_or_dir.is_file() and is_license_file(file_or_dir.name)): - safe_rm(file_or_dir) + return Version(tag=version.tag, branch=branch, revision=revision), [] - def _fetch_impl(self, version: Version) -> tuple[Version, list[Dependency]]: - """Get the revision of the remote and place it at the local path.""" - branch, branch_path, revision = self._determine_what_to_fetch(version) + def _apply_file_pattern( + self, local_path: str, file_pattern: str, source: str + ) -> None: + for file in find_non_matching_files(local_path, (file_pattern,)): + os.remove(file) + if not os.listdir(local_path): + logger.warning( + f"The 'src:' filter '{source}' didn't match any files" + f" from '{self._remote}'" + ) - complete_path = "/".join( - filter(None, [self.remote, branch_path.strip(), self.source]) - ).strip("/") + def _copy_license_files( + self, local_path: str, branch_path: str, revision: str + ) -> None: + root_branch_path = "/".join([self._remote, branch_path]).strip("/") + license_files = SvnFetcher._license_files(root_branch_path) + if license_files: + dest = ( + local_path if os.path.isdir(local_path) else os.path.dirname(local_path) + ) + SvnRepo.export(f"{root_branch_path}/{license_files[0]}", revision, dest) - # When exporting a file, the destination directory must already exist - pathlib.Path(os.path.dirname(self.local_path)).mkdir( - parents=True, exist_ok=True - ) + def _remove_ignored_files(self, local_path: str, ignore: Sequence[str]) -> None: + for file_or_dir in find_matching_files(local_path, ignore): + if not (file_or_dir.is_file() and is_license_file(file_or_dir.name)): + safe_rm(file_or_dir) - complete_path, file_pattern = self._parse_file_pattern(complete_path) + def _resolve_branch_path(self, version: Version) -> tuple[str, str]: + """Return (branch, raw_branch_path) from version without URL-encoding.""" + if version.tag: + return "", f"tags/{version.tag}/" + if version.branch == " ": + return " ", "" + branch = version.branch or SvnRepo.DEFAULT_BRANCH + branch_path = ( + f"branches/{branch}" + if branch != SvnRepo.DEFAULT_BRANCH + else SvnRepo.DEFAULT_BRANCH + ) + return branch, branch_path - SvnRepo.export(complete_path, revision, self.local_path) + def _determine_what_to_fetch(self, version: Version) -> tuple[str, str, str]: + """Return (branch, branch_path, revision) for the given version.""" + branch, branch_path = self._resolve_branch_path(version) + branch_path = urllib.parse.quote(branch_path) + revision = version.revision or self._get_revision(branch_path) - if file_pattern: - for file in find_non_matching_files(self.local_path, (file_pattern,)): - os.remove(file) - if not os.listdir(self.local_path): - logger.warning( - f"The 'src:' filter '{self.source}' didn't match any files from '{self.remote}'" - ) - - if self.source: - root_branch_path = "/".join([self.remote, branch_path]).strip("/") - license_files = SvnSubProject._license_files(root_branch_path) - if license_files: - dest = ( - self.local_path - if os.path.isdir(self.local_path) - else os.path.dirname(self.local_path) - ) - SvnRepo.export(f"{root_branch_path}/{license_files[0]}", revision, dest) - - if self.ignore: - self._remove_ignored_files() + if not revision.isdigit(): + raise RuntimeError(f"{revision} must be a number for SVN") - return Version(tag=version.tag, branch=branch, revision=revision), [] + return branch, branch_path, revision @staticmethod def _parse_file_pattern(complete_path: str) -> tuple[str, str]: if complete_path.count("*") > 1: raise RuntimeError("Only single * supported in 'src:'!") - glob_filter = "" if complete_path.count("*") == 1: before, after = complete_path.split("*", maxsplit=1) @@ -167,22 +193,26 @@ def _parse_file_pattern(complete_path: str) -> tuple[str, str]: return complete_path, glob_filter def _get_info(self, branch: str) -> dict[str, str]: - return SvnRepo.get_info_from_target(f"{self.remote}/{branch}") + return SvnRepo.get_info_from_target(f"{self._remote}/{branch}") @staticmethod def _license_files(url_path: str) -> list[str]: return [ - str(license) - for license in filter(is_license_file, SvnRepo.files_in_path(url_path)) + str(license_file) + for license_file in filter(is_license_file, SvnRepo.files_in_path(url_path)) ] def _get_revision(self, branch: str) -> str: return self._get_info(branch)["Revision"] - def get_default_branch(self) -> str: - """Get the default branch of this repository.""" - return SvnRepo.DEFAULT_BRANCH - - def list_of_branches(self) -> list[str]: - """Return trunk plus any branches found under ``branches/``.""" - return [SvnRepo.DEFAULT_BRANCH, *self._remote_repo.list_of_branches()] + @staticmethod + def list_tool_info() -> None: + """Print the installed svn version.""" + try: + tool, version = get_svn_version() + get_logger(__name__).print_report_line(tool, version.strip()) + except RuntimeError as exc: + logger.debug( + f"Something went wrong trying to get the version of svn: {exc}" + ) + get_logger(__name__).print_report_line("svn", "") diff --git a/dfetch/project/svnsuperproject.py b/dfetch/project/svnsuperproject.py index c3d708bbe..6073f48fb 100644 --- a/dfetch/project/svnsuperproject.py +++ b/dfetch/project/svnsuperproject.py @@ -15,7 +15,7 @@ from dfetch.manifest.project import ProjectEntry from dfetch.project.subproject import SubProject from dfetch.project.superproject import RevisionRange, SuperProject -from dfetch.project.svnsubproject import SvnSubProject +from dfetch.project.svnsubproject import SvnFetcher from dfetch.util.util import ( check_no_path_traversal, in_directory, @@ -42,7 +42,7 @@ def check(path: str | pathlib.Path) -> bool: def get_sub_project(self, project: ProjectEntry) -> SubProject | None: """Get the subproject in the same vcs type as the superproject.""" - return SvnSubProject(project) + return SubProject(project, SvnFetcher(project.remote_url)) def ignored_files(self, path: str) -> Sequence[str]: """Return a list of files that can be ignored in a given path.""" diff --git a/dfetch/terminal/pick.py b/dfetch/terminal/pick.py index 5432a260b..0de596c8a 100644 --- a/dfetch/terminal/pick.py +++ b/dfetch/terminal/pick.py @@ -74,16 +74,16 @@ def _render_pick_item( return f" {cursor} {check}{styled}" -def _render_pick_lines( # pylint: disable=too-many-arguments,too-many-positional-arguments +def _render_pick_lines( title: str, items: list[str], idx: int, top: int, selected: set[int], multi: bool, - n: int, ) -> list[str]: """Build the list of lines to draw for one frame of the pick widget.""" + n = len(items) lines: list[str] = [f" {BOLD}{title}{RESET}"] if top > 0: lines.append(f" {DIM}↑ {top} more above{RESET}") @@ -132,9 +132,7 @@ def scrollable_pick( while True: idx = max(0, min(idx, n - 1)) top = _clamp_scroll(idx, top) - screen.draw( - _render_pick_lines(title, display_items, idx, top, selected, multi, n) - ) + screen.draw(_render_pick_lines(title, display_items, idx, top, selected, multi)) key = read_key() if key in ("UP", "DOWN", "PGUP", "PGDN"): diff --git a/dfetch/terminal/prompt.py b/dfetch/terminal/prompt.py index 8a154b869..34dbe1c9a 100644 --- a/dfetch/terminal/prompt.py +++ b/dfetch/terminal/prompt.py @@ -91,9 +91,11 @@ def numbered_prompt( n = len(entries) while True: - raw = Prompt.ask( - _PROMPT_FORMAT.format(label=label) + f" ({hint})", - default=default, + raw: str = str( + Prompt.ask( + _PROMPT_FORMAT.format(label=label) + f" ({hint})", + default=default, + ) ).strip() if raw.isdigit(): @@ -114,4 +116,4 @@ def prompt(label: str, default: str = "") -> str: """ if is_tty(): return ghost_prompt(f" {GREEN}?{RESET} {label}", default).strip() - return Prompt.ask(_PROMPT_FORMAT.format(label=label), default=default).strip() + return str(Prompt.ask(_PROMPT_FORMAT.format(label=label), default=default)).strip() diff --git a/dfetch/util/license.py b/dfetch/util/license.py index 312365334..72d685102 100644 --- a/dfetch/util/license.py +++ b/dfetch/util/license.py @@ -119,8 +119,7 @@ def guess_license_in_file( probable_licenses = infer_license.api.probabilities(license_text) - return ( - None - if not probable_licenses - else License.from_inferred(*probable_licenses[0], text=license_text) - ) + if not probable_licenses: + return None + inferred_lic, probability = probable_licenses[0] + return License.from_inferred(inferred_lic, probability, text=license_text) diff --git a/doc/static/uml/c3_dfetch_components_project.puml b/doc/static/uml/c3_dfetch_components_project.puml index 917898119..1b813f160 100644 --- a/doc/static/uml/c3_dfetch_components_project.puml +++ b/doc/static/uml/c3_dfetch_components_project.puml @@ -12,22 +12,29 @@ System_Boundary(DFetch, "Dfetch") { Boundary(DfetchProject, "Project", "python", "Main project that has a manifest.") { Component(compAbstractCheckReporter, "AbstractCheckReporter", "python", "Abstract interface for generating a check report.") - Component(compArchiveSub, "ArchiveSubProject", "python", "A subproject based on an archive.") - Component(compGitSub, "GitSubproject", "python", "A subproject based on git.") + Component(compSubProject, "SubProject", "python", "Concrete domain aggregate; composes with a Fetcher.") + Component(compFetcher, "Fetcher", "python", "Protocol: minimal contract for all fetcher types.") + Component(compVcsFetcher, "VcsFetcher", "python", "Protocol: Fetcher extended with VCS semantics (branches, tags, revisions).") + Component(compAbstractVcsFetcher, "AbstractVcsFetcher", "python", "Shared VCS implementation (latest_available_version, freeze).") + Component(compGitFetcher, "GitFetcher", "python", "Fetcher implementation for Git repositories.") + Component(compSvnFetcher, "SvnFetcher", "python", "Fetcher implementation for SVN repositories.") + Component(compArchiveFetcher, "ArchiveFetcher", "python", "Fetcher implementation for archive URLs (no VCS semantics).") Component(compMetadata, "Metadata", "python", "A file containing metadata about a project.") - Component(compSvnSub, "SvnSubproject", "python", "A subproject based on svn.") - Component(compSubProject, "SubProject", "python", "An abstract subproject.") Component(compSuperProject, "SuperProject", "python", "An abstract superproject.") Component(compGitSuper, "GitSuperProject", "python", "A superproject based on git.") Component(compSvnSuper, "SvnSuperProject", "python", "A superproject based on svn.") Component(compNoVcsSuper, "NoVcsSuperProject", "python", "A superproject without VCS.") - Rel_U(compArchiveSub, compSubProject, "Implements") - Rel_U(compGitSub, compSubProject, "Implements") - Rel_U(compSvnSub, compSubProject, "Implements") + Rel(compSubProject, compFetcher, "Composes") Rel(compSubProject, compAbstractCheckReporter, "Uses") Rel_L(compSubProject, compMetadata, "Uses") + Rel_U(compVcsFetcher, compFetcher, "Extends") + Rel_U(compAbstractVcsFetcher, compVcsFetcher, "Implements") + Rel_U(compGitFetcher, compAbstractVcsFetcher, "Extends") + Rel_U(compSvnFetcher, compAbstractVcsFetcher, "Extends") + Rel_U(compArchiveFetcher, compFetcher, "Implements") + Rel_U(compGitSuper, compSuperProject, "Implements") Rel_U(compSvnSuper, compSuperProject, "Implements") Rel_U(compNoVcsSuper, compSuperProject, "Implements") @@ -43,9 +50,9 @@ System_Boundary(DFetch, "Dfetch") { Rel_U(compMetadata, contManifest, "Has") Rel(compSuperProject, contManifest, "Has") Rel_U(contReporting, contManifest, "Uses") - Rel(compArchiveSub, contVcs, "Uses") - Rel(compGitSub, contVcs, "Uses") - Rel(compSvnSub, contVcs, "Uses") + Rel(compGitFetcher, contVcs, "Uses") + Rel(compSvnFetcher, contVcs, "Uses") + Rel(compArchiveFetcher, contVcs, "Uses") Rel(compGitSuper, contVcs, "Uses") Rel(compSvnSuper, contVcs, "Uses") } diff --git a/pyproject.toml b/pyproject.toml index 887a4028e..a33738c00 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -155,6 +155,10 @@ max-line-length = 120 disable = "logging-fstring-interpolation" min-similarity-lines = 10 +[tool.pylint.design] +max-args = 6 +max-positional-arguments = 6 + [tool.pylint.MASTER] ignore-paths = [ "doc/_build/", diff --git a/tests/test_add.py b/tests/test_add.py index 7796290a2..274c1cf98 100644 --- a/tests/test_add.py +++ b/tests/test_add.py @@ -66,9 +66,12 @@ def _make_subproject( branches if branches is not None else [default_branch] ) sp.list_of_tags.return_value = tags if tags is not None else [] - # browse_tree returns an empty ls_fn by default (no remote tree available) - sp.browse_tree.return_value.__enter__ = Mock(return_value=lambda path="": []) - sp.browse_tree.return_value.__exit__ = Mock(return_value=False) + + # as_vcs() returns a VCS mock with browse_tree as a context manager + vcs_mock = Mock() + vcs_mock.browse_tree.return_value.__enter__ = Mock(return_value=lambda path="": []) + vcs_mock.browse_tree.return_value.__exit__ = Mock(return_value=False) + sp.as_vcs.return_value = vcs_mock return sp diff --git a/tests/test_archive.py b/tests/test_archive.py index 50c967765..d2cf53b8f 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -14,7 +14,8 @@ from dfetch.manifest.project import ProjectEntry from dfetch.manifest.version import Version -from dfetch.project.archivesubproject import ArchiveSubProject, _suffix_for_url +from dfetch.project.archivesubproject import ArchiveFetcher, _suffix_for_url +from dfetch.project.subproject import SubProject from dfetch.vcs.archive import ( ARCHIVE_EXTENSIONS, ArchiveLocalRepo, @@ -619,9 +620,14 @@ def _file_url(path: str) -> str: return pathlib.Path(path).as_uri() -def _make_subproject(url: str) -> ArchiveSubProject: - return ArchiveSubProject( - ProjectEntry({"name": "pkg", "url": url, "vcs": "archive"}) +def _make_fetcher(url: str) -> ArchiveFetcher: + return ArchiveFetcher(url) + + +def _make_subproject(url: str) -> SubProject: + return SubProject( + ProjectEntry({"name": "pkg", "url": url, "vcs": "archive"}), + ArchiveFetcher(url), ) @@ -636,9 +642,9 @@ def test_download_and_compute_hash_default_uses_remote_repo(): archive = os.path.join(tmp, "pkg.tar.gz") _make_tar_gz(archive) url = _file_url(archive) - sp = _make_subproject(url) + fetcher = _make_fetcher(url) - result = sp._download_and_compute_hash("sha256") + result = fetcher._download_and_compute_hash("sha256") assert result.algorithm == "sha256" assert result.hex_digest == _sha256_file(archive) @@ -659,11 +665,11 @@ def test_download_and_compute_hash_explicit_url_overrides_remote_repo(): url_a = _file_url(archive_a) url_b = _file_url(archive_b) - # SubProject points to archive_b (current manifest URL). - sp = _make_subproject(url_b) + # Fetcher points to archive_b (current manifest URL). + fetcher = _make_fetcher(url_b) # Passing url=url_a must use archive_a's content. - result = sp._download_and_compute_hash("sha256", url=url_a) + result = fetcher._download_and_compute_hash("sha256", url=url_a) assert result.hex_digest == _sha256_file(archive_a) assert result.hex_digest != _sha256_file(archive_b) diff --git a/tests/test_subproject.py b/tests/test_subproject.py index fcf645d64..2665a1fc8 100644 --- a/tests/test_subproject.py +++ b/tests/test_subproject.py @@ -3,49 +3,74 @@ # mypy: ignore-errors # flake8: noqa +from contextlib import ExitStack, nullcontext from typing import Optional, Union -from unittest.mock import MagicMock, call, patch +from unittest.mock import MagicMock, patch import pytest from dfetch.manifest.project import ProjectEntry from dfetch.manifest.version import Version +from dfetch.project.fetcher import AbstractVcsFetcher from dfetch.project.metadata import Dependency from dfetch.project.subproject import SubProject +from dfetch.vcs.patch import PatchType -class ConcreteSubProject(SubProject): - _wanted_version: Version +class MockVcsFetcher(AbstractVcsFetcher): + """Minimal concrete VCS fetcher for unit tests.""" - def _fetch_impl(self, version: Version) -> tuple[Version, list[Dependency]]: - return Version(), [] + NAME: str = "mock" - def _latest_revision_on_branch(self, branch): - return "latest" + def __init__(self, wanted: Version | None = None) -> None: + self._wanted = wanted or Version() - def check(self): + @classmethod + def handles(cls, remote: str) -> bool: return False - @staticmethod - def list_tool_info(): - pass - - @staticmethod - def revision_is_enough(): + def revision_is_enough(self) -> bool: return False - def _does_revision_exist(self, revision): - return True + def get_default_branch(self) -> str: + return "" - @property - def wanted_version(self): - return self._wanted_version + def list_of_tags(self) -> list[str]: + return [] - def _list_of_tags(self): + def list_of_branches(self) -> list[str]: return [] - def get_default_branch(self): - return "" + def latest_revision_on_branch(self, branch: str) -> str: + return "latest" + + def does_revision_exist(self, revision: str) -> bool: + return True + + def browse_tree(self, version: str) -> object: + return nullcontext(lambda path="": []) + + def patch_type(self) -> PatchType: + return PatchType.GIT + + def fetch( + self, version, local_path, name, source, ignore + ) -> tuple[Version, list[Dependency]]: + return version, [] + + def wanted_version(self, project_entry: ProjectEntry) -> Version: + return self._wanted + + @staticmethod + def list_tool_info() -> None: + pass + + +def _make_subproject( + name: str = "proj1", wanted: Version | None = None +) -> tuple[SubProject, MockVcsFetcher]: + fetcher = MockVcsFetcher(wanted) + return SubProject(ProjectEntry({"name": name}), fetcher), fetcher @pytest.mark.parametrize( @@ -98,13 +123,11 @@ def test_check_wanted_with_local( ): with patch("dfetch.project.subproject.os.path.exists") as mocked_path_exists: with patch("dfetch.project.subproject.Metadata.from_file") as mocked_metadata: - subproject = ConcreteSubProject(ProjectEntry({"name": "proj1"})) + subproject, _ = _make_subproject(wanted=given_wanted) mocked_path_exists.return_value = bool(given_on_disk) mocked_metadata().version = given_on_disk - subproject._wanted_version = given_wanted - wanted, have = subproject.check_wanted_with_local() assert wanted == expect_wanted @@ -126,7 +149,7 @@ def test_are_there_local_changes( with patch( "dfetch.project.subproject.SubProject._on_disk_hash" ) as mocked_on_disk_hash: - subproject = ConcreteSubProject(ProjectEntry({"name": "proj1"})) + subproject, _ = _make_subproject() mocked_on_disk_hash.return_value = hash_in_metadata mocked_hash_directory.return_value = current_hash @@ -150,25 +173,34 @@ def test_update_uses_ignored_files_callback_for_stored_hash(): # Return different values on successive calls to simulate pre/post extraction callback = MagicMock(side_effect=[pre_fetch_ignored, post_fetch_ignored]) - with patch("dfetch.project.subproject.os.path.exists") as mock_exists: - with patch("dfetch.project.subproject.Metadata.from_file") as mock_meta_file: - with patch("dfetch.project.subproject.hash_directory") as mock_hash: - with patch("dfetch.project.subproject.safe_rm"): - with patch("dfetch.project.subproject.Metadata.dump"): - mock_exists.return_value = True - mock_meta_file.return_value.version = Version(revision="abc") - mock_hash.return_value = "hash123" + with ExitStack() as stack: + mock_exists = stack.enter_context( + patch("dfetch.project.subproject.os.path.exists") + ) + mock_meta_file = stack.enter_context( + patch("dfetch.project.subproject.Metadata.from_file") + ) + mock_hash = stack.enter_context( + patch("dfetch.project.subproject.hash_directory") + ) + stack.enter_context(patch("dfetch.project.subproject.safe_rm")) + stack.enter_context(patch("dfetch.project.metadata.Metadata.fetched")) + stack.enter_context(patch("dfetch.project.metadata.Metadata.dump")) + + mock_exists.return_value = True + mock_meta_file.return_value.version = Version(revision="abc") + mock_meta_file.return_value.hash = None + mock_hash.return_value = "hash123" - subproject = ConcreteSubProject(ProjectEntry({"name": "p1"})) - subproject._wanted_version = Version(revision="new") + subproject, _ = _make_subproject(name="p1", wanted=Version(revision="new")) - subproject.update(force=True, ignored_files_callback=callback) + subproject.update(force=True, ignored_files_callback=callback) - assert callback.call_count == 2 - # The hash must be computed with the post-fetch ignored list - hash_call_skiplist = mock_hash.call_args[1]["skiplist"] - assert "new_ignored.txt" in hash_call_skiplist - assert "old_file.txt" not in hash_call_skiplist + assert callback.call_count == 2 + # The hash must be computed with the post-fetch ignored list + hash_call_skiplist = mock_hash.call_args[1]["skiplist"] + assert "new_ignored.txt" in hash_call_skiplist + assert "old_file.txt" not in hash_call_skiplist @pytest.mark.parametrize( @@ -227,7 +259,7 @@ def test_freeze_project( ): with patch("dfetch.project.subproject.os.path.exists") as mocked_path_exists: with patch("dfetch.project.subproject.Metadata.from_file") as mocked_metadata: - subproject = ConcreteSubProject(ProjectEntry({"name": "proj1"})) + subproject, _ = _make_subproject() mocked_path_exists.return_value = bool(on_disk_version) mocked_metadata().version = on_disk_version @@ -263,4 +295,4 @@ def test_ci_enabled( else: monkeypatch.setenv("CI", str(ci_env_value)) - assert ConcreteSubProject._running_in_ci() == expected_result + assert SubProject._running_in_ci() == expected_result diff --git a/tests/test_svn.py b/tests/test_svn.py index bc40dd32f..a434219f0 100644 --- a/tests/test_svn.py +++ b/tests/test_svn.py @@ -9,7 +9,8 @@ import pytest from dfetch.manifest.project import ProjectEntry -from dfetch.project.svnsubproject import SvnSubProject +from dfetch.project.subproject import SubProject +from dfetch.project.svnsubproject import SvnFetcher from dfetch.util.cmdline import SubprocessCommandError from dfetch.vcs.svn import External, SvnRemote, SvnRepo @@ -223,11 +224,16 @@ def test_get_info(): @pytest.fixture def svn_subproject(): - return SvnSubProject(ProjectEntry({"name": "proj3", "url": "some_url"})) + return SubProject( + ProjectEntry({"name": "proj3", "url": "some_url"}), + SvnFetcher("some_url"), + ) def test_svn_subproject_name(svn_subproject): - assert svn_subproject.NAME == "svn" + vcs = svn_subproject.as_vcs() + assert vcs is not None + assert vcs.NAME == "svn" @pytest.fixture