From 94cdf0512517917770bb0d8765743b33f0edd95e Mon Sep 17 00:00:00 2001 From: "Kacper Kowalik (Xarthisius)" Date: Mon, 16 Mar 2026 15:05:50 -0500 Subject: [PATCH] Switch to pathlib to fix Win compatibility: --- tro_utils/cli.py | 14 ++++---- tro_utils/tro_utils.py | 75 +++++++++++++++++++++++++++--------------- 2 files changed, 56 insertions(+), 33 deletions(-) diff --git a/tro_utils/cli.py b/tro_utils/cli.py index d23b542..1f1eaeb 100644 --- a/tro_utils/cli.py +++ b/tro_utils/cli.py @@ -1,5 +1,6 @@ """Console script for tro_utils.""" -import os + +import pathlib import sys import click @@ -34,7 +35,7 @@ def convert(self, value, param, ctx): if value in self.valid_strings: return value # Check if the value is a valid path - elif os.path.exists(value) and os.path.isfile(value): + elif pathlib.Path(value).exists() and pathlib.Path(value).is_file(): return value else: self.fail( @@ -142,7 +143,7 @@ def verify_timestamp(declaration): help="Show detailed information during verification", ) def verify_package(declaration, package, arrangement_id, subpath, verbose): - subpath = subpath if subpath else "" + subpath = subpath if subpath else None tro = TRO( filepath=declaration, ) @@ -160,7 +161,7 @@ def verify_package(declaration, package, arrangement_id, subpath, verbose): msg += f"::{subpath}" msg += "'" click.echo(msg, nl=False) - extra, mismatched, missing, success = tro.verify_replication_package( + missing, mismatched, extra, success = tro.verify_replication_package( arrangement, package, subpath ) if success: @@ -321,9 +322,8 @@ def sign(ctx): def generate_report(ctx, template, output): declaration = ctx.parent.params.get("declaration") if template in _TEMPLATES: - template = os.path.join( - os.path.dirname(__file__), _TEMPLATES[template]["filename"] - ) + template = pathlib.Path(__file__).parent / _TEMPLATES[template]["filename"] + template = str(template.resolve()) tro = TRO( filepath=declaration, ) diff --git a/tro_utils/tro_utils.py b/tro_utils/tro_utils.py index 41c304d..cb9268d 100644 --- a/tro_utils/tro_utils.py +++ b/tro_utils/tro_utils.py @@ -1,4 +1,5 @@ """Main module.""" + import base64 import hashlib import json @@ -42,10 +43,15 @@ def __init__( self.basename = "some_tro" self.dirname = "." else: - self.basename = os.path.basename(filepath).rsplit(".")[0] - self.dirname = os.path.dirname(filepath) or "." + filepath_obj = pathlib.Path(filepath) + self.basename = filepath_obj.stem + self.dirname = ( + str(filepath_obj.parent) + if filepath_obj.parent != pathlib.Path(".") + else "." + ) - if profile is not None and os.path.exists(profile): + if profile is not None and pathlib.Path(profile).exists(): print(f"Loading profile from {profile}") self.profile = json.load(open(profile)) else: @@ -55,7 +61,7 @@ def __init__( "trov:publicKey": None, } - if not os.path.exists(self.tro_filename): + if not pathlib.Path(self.tro_filename).exists(): self.data = { "@context": [ { @@ -100,9 +106,9 @@ def __init__( self.gpg = gnupg.GPG(gnupghome=GPG_HOME, verbose=False) if gpg_fingerprint: self.gpg_key_id = self.gpg.list_keys().key_map[gpg_fingerprint]["keyid"] - self.data["@graph"][0]["trov:wasAssembledBy"][ - "trov:publicKey" - ] = self.gpg.export_keys(self.gpg_key_id) + self.data["@graph"][0]["trov:wasAssembledBy"]["trov:publicKey"] = ( + self.gpg.export_keys(self.gpg_key_id) + ) if gpg_passphrase: self.gpg_passphrase = gpg_passphrase @@ -110,7 +116,7 @@ def __init__( def base_filename(self): if not self.basename: raise ValueError("basename is not set") - return os.path.abspath(os.path.join(self.dirname, self.basename)) + return str((pathlib.Path(self.dirname) / self.basename).resolve()) @property def tro_filename(self): @@ -232,7 +238,7 @@ def add_arrangement( for filepath, hash_value in hashes.items(): if hash_value in composition: continue - if os.path.islink(filepath): + if pathlib.Path(filepath).is_symlink(): mime_type = "inode/symlink" else: mime_type = ( @@ -276,7 +282,8 @@ def save(self): @staticmethod def sha256_for_file(filepath, resolve_symlinks=True): sha256 = hashlib.sha256() - if not os.path.isfile(filepath) or os.path.islink(filepath): + filepath_obj = pathlib.Path(filepath) + if not filepath_obj.is_file() or filepath_obj.is_symlink(): return "" with open(filepath, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): @@ -290,7 +297,7 @@ def sha256_for_directory(self, directory, ignore_dirs=None, resolve_symlinks=Tru for root, dirs, files in os.walk(directory): dirs[:] = [d for d in dirs if d not in ignore_dirs] for filename in files: - filepath = os.path.join(root, filename) + filepath = str(pathlib.Path(root) / filename) hash_value = self.sha256_for_file( filepath, resolve_symlinks=resolve_symlinks ) @@ -396,12 +403,16 @@ def verify_replication_package(self, arrangement_id, package, subpath=None): # Generator to yield (relative_filename, file_hash) tuples def iterate_package_files(): - if os.path.isdir(package): + if pathlib.Path(package).is_dir(): + package_path = pathlib.Path(package) for root, dirs, files in os.walk(package): for filename in files: - filepath = os.path.join(root, filename) - relative_filename = os.path.relpath(filepath, package) - file_hash = self.sha256_for_file(filepath) + filepath = pathlib.Path(root) / filename + # Use pathlib for robust cross-platform path handling + relative_filename = filepath.relative_to( + package_path + ).as_posix() + file_hash = self.sha256_for_file(str(filepath)) yield relative_filename, file_hash else: with zipfile.ZipFile(package, "r") as zf: @@ -418,9 +429,18 @@ def iterate_package_files(): # Handle subpath filtering if subpath is not None: - if not original_filename.startswith(subpath): + # Use pathlib for robust cross-platform subpath handling + original_path = pathlib.PurePosixPath(original_filename) + subpath_posix = pathlib.PurePosixPath(subpath) + + try: + # Check if path is relative to subpath + relative_filename = original_path.relative_to( + subpath_posix + ).as_posix() + except ValueError: + # Path is not under subpath, skip it continue - relative_filename = original_filename[len(subpath) :].lstrip("/") # Check if file exists in arrangement if relative_filename not in arrangement_map: @@ -436,9 +456,12 @@ def iterate_package_files(): or mismatched_hashes or len(arrangement_map) > 0 ) - return files_missing_in_arrangement, mismatched_hashes, list( - arrangement_map.keys() - ), not dirty + return ( + files_missing_in_arrangement, + mismatched_hashes, + list(arrangement_map.keys()), + not dirty, + ) def add_performance( self, @@ -580,13 +603,13 @@ def generate_report(self, template, report): arrangements[keys[n]]["artifacts"][location]["sha256"] != arrangements[keys[n - 1]]["artifacts"][location]["sha256"] ): - arrangements[keys[n]]["artifacts"][location][ - "status" - ] = "Changed" + arrangements[keys[n]]["artifacts"][location]["status"] = ( + "Changed" + ) else: - arrangements[keys[n]]["artifacts"][location][ - "status" - ] = "Unchanged" + arrangements[keys[n]]["artifacts"][location]["status"] = ( + "Unchanged" + ) else: arrangements[keys[n]]["artifacts"][location]["status"] = "Created"