From 1b5d74d69723d578109b400499b486fefb3fcbbc Mon Sep 17 00:00:00 2001 From: Simon Maillard Date: Thu, 16 Oct 2025 07:52:30 +0000 Subject: [PATCH 1/5] operations/facts: add GPG key management operations and facts - Add GpgFactBase, GpgKey, GpgKeys, GpgSecretKeys, and GpgKeyrings facts - Add gpg.key and gpg.dearmor operations for managing GPG keys - Support for keyserver fetching, local files, URLs, and key removal - Comprehensive test coverage for all GPG operations and facts --- src/pyinfra/facts/gpg.py | 68 ++++ src/pyinfra/operations/gpg.py | 327 ++++++++++++++++++ tests/facts/gpg.GpgKeyrings/asc_only.json | 59 ++++ .../gpg.GpgKeyrings/custom_directory.json | 26 ++ tests/facts/gpg.GpgKeyrings/default.json | 42 +++ .../gpg.GpgKeyrings/empty_directory.json | 7 + .../facts/gpg.GpgKeyrings/mixed_formats.json | 62 ++++ tests/operations/gpg.dearmor/basic.json | 22 ++ tests/operations/gpg.dearmor/custom_mode.json | 24 ++ tests/operations/gpg.key/custom_mode.json | 24 ++ .../gpg.key/keyserver_multiple.json | 28 ++ .../gpg.key/keyserver_no_keyid.json | 12 + .../operations/gpg.key/keyserver_single.json | 28 ++ tests/operations/gpg.key/local_file.json | 23 ++ tests/operations/gpg.key/no_dearmor.json | 24 ++ tests/operations/gpg.key/no_dest.json | 11 + .../gpg.key/no_src_or_keyserver.json | 11 + tests/operations/gpg.key/remote_url.json | 29 ++ tests/operations/gpg.key/remove_by_id.json | 32 ++ tests/operations/gpg.key/remove_file.json | 14 + .../gpg.key/remove_from_all_keyrings.json | 32 ++ .../gpg.key/remove_no_dest_no_keyid.json | 10 + 22 files changed, 915 insertions(+) create mode 100644 src/pyinfra/operations/gpg.py create mode 100644 tests/facts/gpg.GpgKeyrings/asc_only.json create mode 100644 tests/facts/gpg.GpgKeyrings/custom_directory.json create mode 100644 tests/facts/gpg.GpgKeyrings/default.json create mode 100644 tests/facts/gpg.GpgKeyrings/empty_directory.json create mode 100644 tests/facts/gpg.GpgKeyrings/mixed_formats.json create mode 100644 tests/operations/gpg.dearmor/basic.json create mode 100644 tests/operations/gpg.dearmor/custom_mode.json create mode 100644 tests/operations/gpg.key/custom_mode.json create mode 100644 tests/operations/gpg.key/keyserver_multiple.json create mode 100644 tests/operations/gpg.key/keyserver_no_keyid.json create mode 100644 tests/operations/gpg.key/keyserver_single.json create mode 100644 tests/operations/gpg.key/local_file.json create mode 100644 tests/operations/gpg.key/no_dearmor.json create mode 100644 tests/operations/gpg.key/no_dest.json create mode 100644 tests/operations/gpg.key/no_src_or_keyserver.json create mode 100644 tests/operations/gpg.key/remote_url.json create mode 100644 tests/operations/gpg.key/remove_by_id.json create mode 100644 tests/operations/gpg.key/remove_file.json create mode 100644 tests/operations/gpg.key/remove_from_all_keyrings.json create mode 100644 tests/operations/gpg.key/remove_no_dest_no_keyid.json diff --git a/src/pyinfra/facts/gpg.py b/src/pyinfra/facts/gpg.py index 3d9fdf97b..b88e06190 100644 --- a/src/pyinfra/facts/gpg.py +++ b/src/pyinfra/facts/gpg.py @@ -148,3 +148,71 @@ def command(self, keyring=None): return ("gpg --list-secret-keys --with-colons --keyring {0} --no-default-keyring").format( keyring, ) + + +class GpgKeyrings(GpgFactBase): + """ + Returns information on all GPG keyrings found in specified directories. + + .. code:: python + + { + "/etc/apt/keyrings/docker.gpg": { + "format": "gpg", + "keys": {...} # Same format as GpgKeys fact + } + } + """ + + @override + def command(self, directories): + if isinstance(directories, str): + directories = [directories] + + search_locations = " ".join(f'"{d}"' for d in directories) + + # Generate a command that finds keyrings and lists their keys + # We'll use a shell script that outputs keyring path followed by key info + return ( + f"for keyring in $(find {search_locations} -type f \\( -name '*.gpg' " + f"-o -name '*.asc' -o -name '*.kbx' \\) 2>/dev/null); do " + f'echo "KEYRING:$keyring"; ' + f'if [[ "$keyring" == *.asc ]]; then ' + f'gpg --with-colons "$keyring" 2>/dev/null || true; ' + f"else " + f'gpg --list-keys --with-colons --keyring "$keyring" --no-default-keyring 2>/dev/null || true; ' # noqa: E501 + f"fi; done" + ) + + @override + def process(self, output): + keyrings = {} + current_keyring = None + current_output: list[str] = [] + + for line in output: + line = line.strip() + if not line: + continue + + if line.startswith("KEYRING:"): + # Process previous keyring if exists + if current_keyring and current_output: + keyring_format = current_keyring.split(".")[-1].lower() + keys = super().process(current_output) + keyrings[current_keyring] = {"format": keyring_format, "keys": keys} + + # Start new keyring + current_keyring = line[8:] # Remove "KEYRING:" prefix + current_output = [] + else: + # Accumulate GPG output for current keyring + current_output.append(line) + + # Process final keyring + if current_keyring and current_output: + keyring_format = current_keyring.split(".")[-1].lower() + keys = super().process(current_output) + keyrings[current_keyring] = {"format": keyring_format, "keys": keys} + + return keyrings diff --git a/src/pyinfra/operations/gpg.py b/src/pyinfra/operations/gpg.py new file mode 100644 index 000000000..392493886 --- /dev/null +++ b/src/pyinfra/operations/gpg.py @@ -0,0 +1,327 @@ +""" +Manage GPG keys and keyrings. +""" + +from pathlib import PurePosixPath +from urllib.parse import urlparse + +from pyinfra import host +from pyinfra.api import OperationError, operation +from pyinfra.facts.gpg import GpgKeyrings + +from . import files + + +@operation() +def key( + src: str | None = None, + dest: str | None = None, + keyserver: str | None = None, + keyid: str | list[str] | None = None, + dearmor: bool = True, + mode: str = "0644", + present: bool = True, + working_dirs: list[str] | None = None, +): + """ + Install or remove GPG keys from various sources. + + Args: + src: filename or URL to a key (ASCII .asc or binary .gpg) + dest: destination path for the key file (required for installation, optional for removal) + keyserver: keyserver URL for fetching keys by ID + keyid: key ID or list of key IDs (required with keyserver, optional for removal) + dearmor: whether to convert ASCII armored keys to binary format + mode: file permissions for the installed key + present: whether the key should be present (True) or absent (False) + working_dirs: dirs to search for existing keyrings (required for removal without dest) + When False: if dest is provided, removes from specific keyring; + if dest is None, removes from keyrings found in working_dirs; + if keyid is provided, removes specific key(s); + if keyid is None, removes entire keyring file(s) + + Examples: + gpg.key( + name="Install Docker GPG key", + src="https://download.docker.com/linux/debian/gpg", + dest="/etc/apt/keyrings/docker.gpg", + ) + + gpg.key( + name="Remove old GPG key file", + dest="/etc/apt/keyrings/old-key.gpg", + present=False, + ) + + gpg.key( + name="Remove specific key by ID", + dest="/etc/apt/keyrings/vendor.gpg", + keyid="0xABCDEF12", + present=False, + ) + + gpg.key( + name="Remove key from specific directories", + keyid="0xCOMPROMISED123", + present=False, + working_dirs=["/etc/apt/keyrings", "/usr/share/keyrings"], + ) + + gpg.key( + name="Fetch keys from keyserver", + keyserver="hkps://keyserver.ubuntu.com", + keyid=["0xD88E42B4", "0x7EA0A9C3"], + dest="/etc/apt/keyrings/vendor.gpg", + ) + """ + + # Validate parameters based on operation type + if present is True: + # For installation, dest is required + if not dest: + raise OperationError("`dest` must be provided for installation") + elif present is False: + # For removal, either dest or (keyid and working_dirs) must be provided + if not dest and not (keyid and working_dirs): + raise OperationError( + "For removal, either `dest` or both `keyid` and `working_dirs` must be provided" + ) + + # For removal, handle different scenarios + if present is False: + if not dest and keyid: + # Remove key(s) from all keyrings found in specified directories + if isinstance(keyid, str): + keyid = [keyid] + + if not working_dirs: + raise OperationError( + "`working_dirs` must be provided when removing keys without `dest`" + ) + + # Use the GpgKeyrings fact to find all keyrings in specified directories + keyrings_info = host.get_fact(GpgKeyrings, directories=working_dirs) + + for keyring_path, keyring_data in keyrings_info.items(): + # Get the keys from the GpgKeyrings fact data + keys_in_keyring = keyring_data.get("keys", {}) + + # Check if any of the target keys exist in this keyring + keys_to_remove = [] + for kid in keyid: + # Handle different key ID formats (short, long, with/without 0x prefix) + clean_key = kid.replace("0x", "").replace("0X", "").upper() + + # Check for exact match or if the key ID is a suffix/prefix of any key + # in the keyring + for existing_key_id in keys_in_keyring.keys(): + if ( + clean_key == existing_key_id.upper() + or existing_key_id.upper().endswith(clean_key) + or existing_key_id.upper().startswith(clean_key) + ): + keys_to_remove.append(existing_key_id) + + if keys_to_remove: + # Remove the entire keyring file if any target keys are found + # This is the safest approach for keyring management + yield from files.file._inner( + path=keyring_path, + present=False, + ) + + return + + elif dest and keyid: + # Remove specific key(s) from a specific keyring file + if isinstance(keyid, str): + keyid = [keyid] + + # Check if the destination keyring exists and contains the target keys + keyrings_info = host.get_fact( + GpgKeyrings, directories=[str(PurePosixPath(dest).parent)] + ) + + if dest in keyrings_info: + keyring_data = keyrings_info[dest] + keys_in_keyring = keyring_data.get("keys", {}) + + # Check if any of the target keys exist in this keyring + keys_found = False + for kid in keyid: + clean_key = kid.replace("0x", "").replace("0X", "").upper() + for existing_key_id in keys_in_keyring.keys(): + # Check for exact match, suffix (short key ID), or prefix match + if ( + clean_key == existing_key_id.upper() + or existing_key_id.upper().endswith(clean_key) + or existing_key_id.upper().startswith(clean_key) + ): + keys_found = True + break + if keys_found: + break + + if keys_found: + # Remove the entire keyring file - safest approach for keyring management + yield from files.file._inner( + path=dest, + present=False, + ) + return + + elif dest and not keyid: + # Remove entire keyring file + yield from files.file._inner( + path=dest, + present=False, + ) + return + + else: + raise OperationError("Invalid parameters for removal operation") + + # For installation, validate required parameters + if not src and not keyserver: + raise OperationError("Either `src` or `keyserver` must be provided for installation") + + if keyserver and not keyid: + raise OperationError("`keyid` must be provided with `keyserver`") + + if keyid and not keyserver and not src: + raise OperationError( + "When using `keyid` for installation, either `keyserver` or `src` must be provided" + ) + + # For installation (present=True), ensure destination directory exists + if dest is None: + raise OperationError("dest is required for installation") + + dest_dir = str(PurePosixPath(dest).parent) + yield from files.directory._inner( + path=dest_dir, + mode="0755", + present=True, + ) + + # --- src branch: install a key from URL or local file --- + if src: + if urlparse(src).scheme in ("http", "https"): + # Remote source: download first, then process + temp_file = host.get_temp_filename(src) + + yield from files.download._inner( + src=src, + dest=temp_file, + ) + + # Install the key and clean up temp file + yield from _install_key_file(temp_file, dest, dearmor, mode) + + # Clean up temp file using pyinfra + yield from files.file._inner( + path=temp_file, + present=False, + ) + else: + # Local file: install directly + yield from _install_key_file(src, dest, dearmor, mode) + + # --- keyserver branch: fetch keys by ID --- + if keyserver: + if keyid is None: + raise OperationError("`keyid` must be provided with `keyserver`") + + if isinstance(keyid, str): + keyid = [keyid] + + joined = " ".join(keyid) + + # Create temporary GPG home directory + temp_dir = f"/tmp/pyinfra-gpg-{host.get_temp_filename('')[-8:]}" + + yield from files.directory._inner( + path=temp_dir, + mode="0700", # GPG directories should be more restrictive + present=True, + ) + + # Export GNUPGHOME and fetch keys + yield f'export GNUPGHOME="{temp_dir}" && gpg --batch --keyserver "{keyserver}" --recv-keys {joined}' # noqa: E501 + + # Export keys to destination - always use direct binary export + # gpg --export produces binary format by default, no dearmoring needed + yield (f'export GNUPGHOME="{temp_dir}" && ' f'gpg --batch --export {joined} > "{dest}"') + + # Clean up temporary directory + yield from files.directory._inner( + path=temp_dir, + present=False, + ) + + # Set proper permissions + yield from files.file._inner( + path=dest, + mode=mode, + present=True, + ) + + +@operation() +def dearmor(src: str, dest: str, mode: str = "0644"): + """ + Convert ASCII armored GPG key to binary format. + + Args: + src: source ASCII armored key file + dest: destination binary key file + mode: file permissions for the output file + + Example: + gpg.dearmor( + name="Convert key to binary", + src="/tmp/key.asc", + dest="/etc/apt/keyrings/key.gpg", + ) + """ + + # Ensure destination directory exists + dest_dir = str(PurePosixPath(dest).parent) + yield from files.directory._inner( + path=dest_dir, + mode="0755", + present=True, + ) + + yield f'gpg --batch --dearmor -o "{dest}" "{src}"' + + # Set proper permissions + yield from files.file._inner( + path=dest, + mode=mode, + present=True, + ) + + +def _install_key_file(src_file: str, dest_path: str, dearmor: bool, mode: str): + """ + Helper function to install a GPG key file, dearmoring if necessary. + """ + if dearmor: + # Check if it's an ASCII armored key and handle accordingly + # Note: Could be enhanced using GpgKey fact + yield ( + f'if grep -q "BEGIN PGP PUBLIC KEY BLOCK" "{src_file}"; then ' + f'gpg --batch --dearmor -o "{dest_path}" "{src_file}"; ' + f'else cp "{src_file}" "{dest_path}"; fi' + ) + else: + # Simple copy for binary keys or when dearmoring is disabled + yield f'cp "{src_file}" "{dest_path}"' + + # Set proper permissions + yield from files.file._inner( + path=dest_path, + mode=mode, + present=True, + ) diff --git a/tests/facts/gpg.GpgKeyrings/asc_only.json b/tests/facts/gpg.GpgKeyrings/asc_only.json new file mode 100644 index 000000000..6804b7526 --- /dev/null +++ b/tests/facts/gpg.GpgKeyrings/asc_only.json @@ -0,0 +1,59 @@ +{ + "command": "for keyring in $(find \"/etc/apt/trusted.gpg.d\" -type f \\( -name '*.gpg' -o -name '*.asc' -o -name '*.kbx' \\) 2>/dev/null); do echo \"KEYRING:$keyring\"; if [[ \"$keyring\" == *.asc ]]; then gpg --with-colons \"$keyring\" 2>/dev/null || true; else gpg --list-keys --with-colons --keyring \"$keyring\" --no-default-keyring 2>/dev/null || true; fi; done", + "requires_command": "gpg", + "arg": ["/etc/apt/trusted.gpg.d"], + "output": [ + "KEYRING:/etc/apt/trusted.gpg.d/debian-archive-bookworm-automatic.asc", + "pub:-:4096:1:B7C5D7D6350947F8:1663251644:::-:::scSC::::::23::0:", + "fpr:::::::::4D64390375060AA4D3E84D8FB7C5D7D6350947F8:", + "uid:-::::1663251644::7B1EFED0F7198D8488DD53F0E95BB93FC4D0A10C::Debian Stable Release Key (12/bookworm) ::::::::::0:", + "sub:-:4096:1:16E0B0B5FC11C3A7:1663251644:::-:::e::::::23:", + "fpr:::::::::CFD35E5D1F5CBDE15AE59C5E16E0B0B5FC11C3A7:", + "KEYRING:/etc/apt/trusted.gpg.d/debian-archive-bookworm-stable.asc", + "pub:-:4096:1:4F368D5D67269BAC:1663251677:::-:::scSC::::::23::0:", + "fpr:::::::::80E46CDDD2798E51FB74F84E4F368D5D67269BAC:", + "uid:-::::1663251677::BB28BDBA35F9A1A92F44E4C96A9E86FC66F9D4E6::Debian Archive Automatic Signing Key (12/bookworm) ::::::::::0:", + "sub:-:4096:1:D7AB25251C0A3EE3:1663251677:::-:::e::::::23:", + "fpr:::::::::C5691F97B9C30A5CF0A3B1F9D7AB25251C0A3EE3:" + ], + "fact": { + "/etc/apt/trusted.gpg.d/debian-archive-bookworm-automatic.asc": { + "format": "asc", + "keys": { + "B7C5D7D6350947F8": { + "validity": "-", + "length": 4096, + "subkeys": { + "16E0B0B5FC11C3A7": { + "validity": "-", + "length": 4096, + "fingerprint": "CFD35E5D1F5CBDE15AE59C5E16E0B0B5FC11C3A7" + } + }, + "fingerprint": "4D64390375060AA4D3E84D8FB7C5D7D6350947F8", + "uid_hash": "7B1EFED0F7198D8488DD53F0E95BB93FC4D0A10C", + "uid": "Debian Stable Release Key (12/bookworm) " + } + } + }, + "/etc/apt/trusted.gpg.d/debian-archive-bookworm-stable.asc": { + "format": "asc", + "keys": { + "4F368D5D67269BAC": { + "validity": "-", + "length": 4096, + "subkeys": { + "D7AB25251C0A3EE3": { + "validity": "-", + "length": 4096, + "fingerprint": "C5691F97B9C30A5CF0A3B1F9D7AB25251C0A3EE3" + } + }, + "fingerprint": "80E46CDDD2798E51FB74F84E4F368D5D67269BAC", + "uid_hash": "BB28BDBA35F9A1A92F44E4C96A9E86FC66F9D4E6", + "uid": "Debian Archive Automatic Signing Key (12/bookworm) " + } + } + } + } +} diff --git a/tests/facts/gpg.GpgKeyrings/custom_directory.json b/tests/facts/gpg.GpgKeyrings/custom_directory.json new file mode 100644 index 000000000..884ce59e0 --- /dev/null +++ b/tests/facts/gpg.GpgKeyrings/custom_directory.json @@ -0,0 +1,26 @@ +{ + "command": "for keyring in $(find \"/custom/keyring/path\" -type f \\( -name '*.gpg' -o -name '*.asc' -o -name '*.kbx' \\) 2>/dev/null); do echo \"KEYRING:$keyring\"; if [[ \"$keyring\" == *.asc ]]; then gpg --with-colons \"$keyring\" 2>/dev/null || true; else gpg --list-keys --with-colons --keyring \"$keyring\" --no-default-keyring 2>/dev/null || true; fi; done", + "requires_command": "gpg", + "arg": ["/custom/keyring/path"], + "output": [ + "KEYRING:/custom/keyring/path/custom.asc", + "pub:-:4096:1:9A3B3C4D5E6F7890:1622505600:::-:::scSC::::::23::0:", + "fpr:::::::::1234567890ABCDEF1234567890ABCDEF12345678:", + "uid:-::::1622505600::::Custom Key ::::::::::0:" + ], + "fact": { + "/custom/keyring/path/custom.asc": { + "format": "asc", + "keys": { + "9A3B3C4D5E6F7890": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "fingerprint": "1234567890ABCDEF1234567890ABCDEF12345678", + "uid_hash": "", + "uid": "Custom Key " + } + } + } + } +} diff --git a/tests/facts/gpg.GpgKeyrings/default.json b/tests/facts/gpg.GpgKeyrings/default.json new file mode 100644 index 000000000..5bce9cf17 --- /dev/null +++ b/tests/facts/gpg.GpgKeyrings/default.json @@ -0,0 +1,42 @@ +{ + "command": "for keyring in $(find \"/etc/apt/trusted.gpg.d\" \"/etc/apt/keyrings\" \"/usr/share/keyrings\" -type f \\( -name '*.gpg' -o -name '*.asc' -o -name '*.kbx' \\) 2>/dev/null); do echo \"KEYRING:$keyring\"; if [[ \"$keyring\" == *.asc ]]; then gpg --with-colons \"$keyring\" 2>/dev/null || true; else gpg --list-keys --with-colons --keyring \"$keyring\" --no-default-keyring 2>/dev/null || true; fi; done", + "requires_command": "gpg", + "arg": [["/etc/apt/trusted.gpg.d", "/etc/apt/keyrings", "/usr/share/keyrings"]], + "output": [ + "KEYRING:/etc/apt/keyrings/docker.gpg", + "tru:t:1:1601454628:0:3:1:5", + "pub:-:4096:1:ABCD1234EFGH5678:1336770936:::-:::scSC::::::23::0:", + "fpr:::::::::ABCD1234EFGH5678IJKL9012MNOP3456QRST7890:", + "uid:-::::1336770936::ABC123DEF456::Docker Release (CE deb) ::::::::::0:", + "KEYRING:/etc/apt/trusted.gpg.d/ubuntu-keyring.asc", + "pub:-:4096:1:1234ABCD5678EFGH:1336774248:::-:::scSC::::::23::0:", + "uid:-::::1336774248::::Ubuntu Archive Signing Key ::::::::::0:" + ], + "fact": { + "/etc/apt/keyrings/docker.gpg": { + "format": "gpg", + "keys": { + "ABCD1234EFGH5678": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "fingerprint": "ABCD1234EFGH5678IJKL9012MNOP3456QRST7890", + "uid_hash": "ABC123DEF456", + "uid": "Docker Release (CE deb) " + } + } + }, + "/etc/apt/trusted.gpg.d/ubuntu-keyring.asc": { + "format": "asc", + "keys": { + "1234ABCD5678EFGH": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "uid_hash": "", + "uid": "Ubuntu Archive Signing Key " + } + } + } + } +} diff --git a/tests/facts/gpg.GpgKeyrings/empty_directory.json b/tests/facts/gpg.GpgKeyrings/empty_directory.json new file mode 100644 index 000000000..51f1c4eaf --- /dev/null +++ b/tests/facts/gpg.GpgKeyrings/empty_directory.json @@ -0,0 +1,7 @@ +{ + "command": "for keyring in $(find \"/empty/directory\" -type f \\( -name '*.gpg' -o -name '*.asc' -o -name '*.kbx' \\) 2>/dev/null); do echo \"KEYRING:$keyring\"; if [[ \"$keyring\" == *.asc ]]; then gpg --with-colons \"$keyring\" 2>/dev/null || true; else gpg --list-keys --with-colons --keyring \"$keyring\" --no-default-keyring 2>/dev/null || true; fi; done", + "requires_command": "gpg", + "arg": ["/empty/directory"], + "output": [], + "fact": {} +} diff --git a/tests/facts/gpg.GpgKeyrings/mixed_formats.json b/tests/facts/gpg.GpgKeyrings/mixed_formats.json new file mode 100644 index 000000000..ea649e7a2 --- /dev/null +++ b/tests/facts/gpg.GpgKeyrings/mixed_formats.json @@ -0,0 +1,62 @@ +{ + "command": "for keyring in $(find \"/etc/apt/trusted.gpg.d\" -type f \\( -name '*.gpg' -o -name '*.asc' -o -name '*.kbx' \\) 2>/dev/null); do echo \"KEYRING:$keyring\"; if [[ \"$keyring\" == *.asc ]]; then gpg --with-colons \"$keyring\" 2>/dev/null || true; else gpg --list-keys --with-colons --keyring \"$keyring\" --no-default-keyring 2>/dev/null || true; fi; done", + "requires_command": "gpg", + "arg": ["/etc/apt/trusted.gpg.d"], + "output": [ + "KEYRING:/etc/apt/trusted.gpg.d/debian-archive.gpg", + "tru:t:1:1601454628:0:3:1:5", + "pub:-:4096:1:73A4F27B8DD47936:1663251644:::-:::scSC::::::23::0:", + "fpr:::::::::4D64390375060AA4D3E84D8F73A4F27B8DD47936:", + "uid:-::::1663251644::7B1EFED0F7198D8488DD53F0E95BB93FC4D0A10C::Debian Archive Automatic Signing Key (11/bullseye) ::::::::::0:", + "KEYRING:/etc/apt/trusted.gpg.d/ubuntu-keyring.asc", + "pub:-:4096:1:1E9377A2BA9EF27F:1336770936:::-:::scSC::::::23::0:", + "fpr:::::::::790BC7277767219C42C86F931E9377A2BA9EF27F:", + "uid:-::::1336770936::::Ubuntu Archive Signing Key ::::::::::0:", + "KEYRING:/etc/apt/trusted.gpg.d/docker.kbx", + "tru:t:1:1601454628:0:3:1:5", + "pub:-:4096:1:9DC858229FC7DD38:1498167007:::-:::scSC::::::23::0:", + "fpr:::::::::9DC858229FC7DD38B3088BD89DC858229FC7DD38:", + "uid:-::::1498167007::::Docker Release (CE deb) ::::::::::0:" + ], + "fact": { + "/etc/apt/trusted.gpg.d/debian-archive.gpg": { + "format": "gpg", + "keys": { + "73A4F27B8DD47936": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "fingerprint": "4D64390375060AA4D3E84D8F73A4F27B8DD47936", + "uid_hash": "7B1EFED0F7198D8488DD53F0E95BB93FC4D0A10C", + "uid": "Debian Archive Automatic Signing Key (11/bullseye) " + } + } + }, + "/etc/apt/trusted.gpg.d/ubuntu-keyring.asc": { + "format": "asc", + "keys": { + "1E9377A2BA9EF27F": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "fingerprint": "790BC7277767219C42C86F931E9377A2BA9EF27F", + "uid_hash": "", + "uid": "Ubuntu Archive Signing Key " + } + } + }, + "/etc/apt/trusted.gpg.d/docker.kbx": { + "format": "kbx", + "keys": { + "9DC858229FC7DD38": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "fingerprint": "9DC858229FC7DD38B3088BD89DC858229FC7DD38", + "uid_hash": "", + "uid": "Docker Release (CE deb) " + } + } + } + } +} diff --git a/tests/operations/gpg.dearmor/basic.json b/tests/operations/gpg.dearmor/basic.json new file mode 100644 index 000000000..62fa745e4 --- /dev/null +++ b/tests/operations/gpg.dearmor/basic.json @@ -0,0 +1,22 @@ +{ + "args": [], + "kwargs": { + "src": "/tmp/test.asc", + "dest": "/tmp/test.gpg" + }, + "facts": { + "files.Directory": { + "path=/tmp": { + "mode": 755 + } + }, + "files.File": { + "path=/tmp/test.gpg": null + } + }, + "commands": [ + "gpg --batch --dearmor -o \"/tmp/test.gpg\" \"/tmp/test.asc\"", + "touch /tmp/test.gpg", + "chmod 644 /tmp/test.gpg" + ] +} diff --git a/tests/operations/gpg.dearmor/custom_mode.json b/tests/operations/gpg.dearmor/custom_mode.json new file mode 100644 index 000000000..48e3f6415 --- /dev/null +++ b/tests/operations/gpg.dearmor/custom_mode.json @@ -0,0 +1,24 @@ +{ + "args": [], + "kwargs": { + "src": "/tmp/key.asc", + "dest": "/etc/apt/keyrings/key.gpg", + "mode": "0600" + }, + "facts": { + "files.Directory": { + "path=/etc/apt/keyrings": null + }, + "files.File": { + "path=/etc/apt/keyrings/key.gpg": null + } + }, + "commands": [ + "mkdir -p /etc/apt/keyrings", + "chmod 755 /etc/apt/keyrings", + "gpg --batch --dearmor -o \"/etc/apt/keyrings/key.gpg\" \"/tmp/key.asc\"", + "mkdir -p /etc/apt/keyrings", + "touch /etc/apt/keyrings/key.gpg", + "chmod 600 /etc/apt/keyrings/key.gpg" + ] +} diff --git a/tests/operations/gpg.key/custom_mode.json b/tests/operations/gpg.key/custom_mode.json new file mode 100644 index 000000000..2de7e69f0 --- /dev/null +++ b/tests/operations/gpg.key/custom_mode.json @@ -0,0 +1,24 @@ +{ + "args": [], + "kwargs": { + "src": "/tmp/local-key.asc", + "dest": "/etc/apt/keyrings/test.gpg", + "mode": "0600" + }, + "facts": { + "files.Directory": { + "path=/etc/apt/keyrings": null + }, + "files.File": { + "path=/etc/apt/keyrings/test.gpg": null + } + }, + "commands": [ + "mkdir -p /etc/apt/keyrings", + "chmod 755 /etc/apt/keyrings", + "if grep -q \"BEGIN PGP PUBLIC KEY BLOCK\" \"/tmp/local-key.asc\"; then gpg --batch --dearmor -o \"/etc/apt/keyrings/test.gpg\" \"/tmp/local-key.asc\"; else cp \"/tmp/local-key.asc\" \"/etc/apt/keyrings/test.gpg\"; fi", + "mkdir -p /etc/apt/keyrings", + "touch /etc/apt/keyrings/test.gpg", + "chmod 600 /etc/apt/keyrings/test.gpg" + ] +} diff --git a/tests/operations/gpg.key/keyserver_multiple.json b/tests/operations/gpg.key/keyserver_multiple.json new file mode 100644 index 000000000..571a57866 --- /dev/null +++ b/tests/operations/gpg.key/keyserver_multiple.json @@ -0,0 +1,28 @@ +{ + "args": [], + "kwargs": { + "keyserver": "hkps://keyserver.ubuntu.com", + "keyid": ["0xD88E42B4", "0x7EA0A9C3"], + "dest": "/etc/apt/keyrings/vendor.gpg" + }, + "facts": { + "files.Directory": { + "path=/etc/apt/keyrings": null, + "path=/tmp/pyinfra-gpg-empfile_": null + }, + "files.File": { + "path=/etc/apt/keyrings/vendor.gpg": null + } + }, + "commands": [ + "mkdir -p /etc/apt/keyrings", + "chmod 755 /etc/apt/keyrings", + "mkdir -p /tmp/pyinfra-gpg-empfile_", + "chmod 700 /tmp/pyinfra-gpg-empfile_", + "export GNUPGHOME=\"/tmp/pyinfra-gpg-empfile_\" && gpg --batch --keyserver \"hkps://keyserver.ubuntu.com\" --recv-keys 0xD88E42B4 0x7EA0A9C3", + "export GNUPGHOME=\"/tmp/pyinfra-gpg-empfile_\" && gpg --batch --export 0xD88E42B4 0x7EA0A9C3 > \"/etc/apt/keyrings/vendor.gpg\"", + "mkdir -p /etc/apt/keyrings", + "touch /etc/apt/keyrings/vendor.gpg", + "chmod 644 /etc/apt/keyrings/vendor.gpg" + ] +} diff --git a/tests/operations/gpg.key/keyserver_no_keyid.json b/tests/operations/gpg.key/keyserver_no_keyid.json new file mode 100644 index 000000000..f6bbf5bbe --- /dev/null +++ b/tests/operations/gpg.key/keyserver_no_keyid.json @@ -0,0 +1,12 @@ +{ + "args": [], + "kwargs": { + "keyserver": "hkps://keyserver.ubuntu.com", + "dest": "/etc/apt/keyrings/test.gpg" + }, + "exception": { + "names": ["OperationError"], + "message": "`keyid` must be provided with `keyserver`" + }, + "facts": {} +} diff --git a/tests/operations/gpg.key/keyserver_single.json b/tests/operations/gpg.key/keyserver_single.json new file mode 100644 index 000000000..26fcb4e7c --- /dev/null +++ b/tests/operations/gpg.key/keyserver_single.json @@ -0,0 +1,28 @@ +{ + "args": [], + "kwargs": { + "keyserver": "hkps://keyserver.ubuntu.com", + "keyid": ["0xD88E42B4"], + "dest": "/etc/apt/keyrings/vendor.gpg" + }, + "facts": { + "files.Directory": { + "path=/etc/apt/keyrings": null, + "path=/tmp/pyinfra-gpg-empfile_": null + }, + "files.File": { + "path=/etc/apt/keyrings/vendor.gpg": null + } + }, + "commands": [ + "mkdir -p /etc/apt/keyrings", + "chmod 755 /etc/apt/keyrings", + "mkdir -p /tmp/pyinfra-gpg-empfile_", + "chmod 700 /tmp/pyinfra-gpg-empfile_", + "export GNUPGHOME=\"/tmp/pyinfra-gpg-empfile_\" && gpg --batch --keyserver \"hkps://keyserver.ubuntu.com\" --recv-keys 0xD88E42B4", + "export GNUPGHOME=\"/tmp/pyinfra-gpg-empfile_\" && gpg --batch --export 0xD88E42B4 > \"/etc/apt/keyrings/vendor.gpg\"", + "mkdir -p /etc/apt/keyrings", + "touch /etc/apt/keyrings/vendor.gpg", + "chmod 644 /etc/apt/keyrings/vendor.gpg" + ] +} diff --git a/tests/operations/gpg.key/local_file.json b/tests/operations/gpg.key/local_file.json new file mode 100644 index 000000000..ba86d06fa --- /dev/null +++ b/tests/operations/gpg.key/local_file.json @@ -0,0 +1,23 @@ +{ + "args": [], + "kwargs": { + "src": "/tmp/local-key.asc", + "dest": "/etc/apt/keyrings/test.gpg" + }, + "facts": { + "files.Directory": { + "path=/etc/apt/keyrings": null + }, + "files.File": { + "path=/etc/apt/keyrings/test.gpg": null + } + }, + "commands": [ + "mkdir -p /etc/apt/keyrings", + "chmod 755 /etc/apt/keyrings", + "if grep -q \"BEGIN PGP PUBLIC KEY BLOCK\" \"/tmp/local-key.asc\"; then gpg --batch --dearmor -o \"/etc/apt/keyrings/test.gpg\" \"/tmp/local-key.asc\"; else cp \"/tmp/local-key.asc\" \"/etc/apt/keyrings/test.gpg\"; fi", + "mkdir -p /etc/apt/keyrings", + "touch /etc/apt/keyrings/test.gpg", + "chmod 644 /etc/apt/keyrings/test.gpg" + ] +} diff --git a/tests/operations/gpg.key/no_dearmor.json b/tests/operations/gpg.key/no_dearmor.json new file mode 100644 index 000000000..4095c6d05 --- /dev/null +++ b/tests/operations/gpg.key/no_dearmor.json @@ -0,0 +1,24 @@ +{ + "args": [], + "kwargs": { + "src": "/tmp/local-key.gpg", + "dest": "/etc/apt/keyrings/test.gpg", + "dearmor": false + }, + "facts": { + "files.Directory": { + "path=/etc/apt/keyrings": null + }, + "files.File": { + "path=/etc/apt/keyrings/test.gpg": null + } + }, + "commands": [ + "mkdir -p /etc/apt/keyrings", + "chmod 755 /etc/apt/keyrings", + "cp \"/tmp/local-key.gpg\" \"/etc/apt/keyrings/test.gpg\"", + "mkdir -p /etc/apt/keyrings", + "touch /etc/apt/keyrings/test.gpg", + "chmod 644 /etc/apt/keyrings/test.gpg" + ] +} diff --git a/tests/operations/gpg.key/no_dest.json b/tests/operations/gpg.key/no_dest.json new file mode 100644 index 000000000..134562316 --- /dev/null +++ b/tests/operations/gpg.key/no_dest.json @@ -0,0 +1,11 @@ +{ + "args": [], + "kwargs": { + "src": "/tmp/test.asc" + }, + "facts": {}, + "exception": { + "names": ["OperationError"], + "message": "`dest` must be provided for installation" + } +} diff --git a/tests/operations/gpg.key/no_src_or_keyserver.json b/tests/operations/gpg.key/no_src_or_keyserver.json new file mode 100644 index 000000000..786d22d1c --- /dev/null +++ b/tests/operations/gpg.key/no_src_or_keyserver.json @@ -0,0 +1,11 @@ +{ + "args": [], + "kwargs": { + "dest": "/etc/apt/keyrings/test.gpg" + }, + "exception": { + "names": ["OperationError"], + "message": "Either `src` or `keyserver` must be provided for installation" + }, + "facts": {} +} diff --git a/tests/operations/gpg.key/remote_url.json b/tests/operations/gpg.key/remote_url.json new file mode 100644 index 000000000..509887e86 --- /dev/null +++ b/tests/operations/gpg.key/remote_url.json @@ -0,0 +1,29 @@ +{ + "args": [], + "kwargs": { + "src": "https://example.com/key.asc", + "dest": "/etc/apt/keyrings/test.gpg" + }, + "facts": { + "files.Directory": { + "path=/etc/apt/keyrings": null + }, + "files.File": { + "path=/etc/apt/keyrings/test.gpg": null, + "path=_tempfile_": null + }, + "server.Which": { + "command=curl": "/usr/bin/curl" + } + }, + "commands": [ + "mkdir -p /etc/apt/keyrings", + "chmod 755 /etc/apt/keyrings", + "curl -sSLf https://example.com/key.asc -o _tempfile_", + "mv _tempfile_ _tempfile_", + "if grep -q \"BEGIN PGP PUBLIC KEY BLOCK\" \"_tempfile_\"; then gpg --batch --dearmor -o \"/etc/apt/keyrings/test.gpg\" \"_tempfile_\"; else cp \"_tempfile_\" \"/etc/apt/keyrings/test.gpg\"; fi", + "mkdir -p /etc/apt/keyrings", + "touch /etc/apt/keyrings/test.gpg", + "chmod 644 /etc/apt/keyrings/test.gpg" + ] +} diff --git a/tests/operations/gpg.key/remove_by_id.json b/tests/operations/gpg.key/remove_by_id.json new file mode 100644 index 000000000..8d6e7eb98 --- /dev/null +++ b/tests/operations/gpg.key/remove_by_id.json @@ -0,0 +1,32 @@ +{ + "kwargs": { + "dest": "/etc/apt/keyrings/vendor.gpg", + "keyid": "0xABCDEF12", + "present": false + }, + "facts": { + "gpg.GpgKeyrings": { + "directories=['/etc/apt/keyrings']": { + "/etc/apt/keyrings/vendor.gpg": { + "format": "gpg", + "keys": { + "ABCDEF1234567890": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "fingerprint": "ABCDEF1234567890FEDCBA0987654321ABCDEF12", + "uid_hash": "ABC123DEF456", + "uid": "Vendor Key " + } + } + } + } + }, + "files.File": { + "path=/etc/apt/keyrings/vendor.gpg": {"mode": 644} + } + }, + "commands": [ + "rm -f /etc/apt/keyrings/vendor.gpg" + ] +} diff --git a/tests/operations/gpg.key/remove_file.json b/tests/operations/gpg.key/remove_file.json new file mode 100644 index 000000000..ac9adcb9e --- /dev/null +++ b/tests/operations/gpg.key/remove_file.json @@ -0,0 +1,14 @@ +{ + "kwargs": { + "dest": "/etc/apt/keyrings/test.gpg", + "present": false + }, + "facts": { + "files.File": { + "path=/etc/apt/keyrings/test.gpg": {"mode": 644} + } + }, + "commands": [ + "rm -f /etc/apt/keyrings/test.gpg" + ] +} diff --git a/tests/operations/gpg.key/remove_from_all_keyrings.json b/tests/operations/gpg.key/remove_from_all_keyrings.json new file mode 100644 index 000000000..62a932659 --- /dev/null +++ b/tests/operations/gpg.key/remove_from_all_keyrings.json @@ -0,0 +1,32 @@ +{ + "kwargs": { + "keyid": "0xCOMPROMISED123", + "present": false, + "working_dirs": ["/etc/apt/trusted.gpg.d", "/etc/apt/keyrings", "/usr/share/keyrings"] + }, + "facts": { + "gpg.GpgKeyrings": { + "directories=['/etc/apt/trusted.gpg.d', '/etc/apt/keyrings', '/usr/share/keyrings']": { + "/etc/apt/trusted.gpg.d/compromised.gpg": { + "format": "gpg", + "keys": { + "COMPROMISED123567890": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "fingerprint": "COMPROMISED123567890FEDCBA0987654321COMPROMISED123", + "uid_hash": "ABC123DEF456", + "uid": "Compromised Key " + } + } + } + } + }, + "files.File": { + "path=/etc/apt/trusted.gpg.d/compromised.gpg": {"mode": 644} + } + }, + "commands": [ + "rm -f /etc/apt/trusted.gpg.d/compromised.gpg" + ] +} diff --git a/tests/operations/gpg.key/remove_no_dest_no_keyid.json b/tests/operations/gpg.key/remove_no_dest_no_keyid.json new file mode 100644 index 000000000..03225013d --- /dev/null +++ b/tests/operations/gpg.key/remove_no_dest_no_keyid.json @@ -0,0 +1,10 @@ +{ + "kwargs": { + "present": false + }, + "exception": { + "names": ["OperationError"], + "message": "For removal, either `dest` or both `keyid` and `working_dirs` must be provided" + }, + "facts": {} +} From 55aca143a85a2db259ba55ad77e096b80b0adb3a Mon Sep 17 00:00:00 2001 From: Simon Maillard Date: Thu, 16 Oct 2025 07:52:30 +0000 Subject: [PATCH 2/5] operations/facts: add GPG key management operations and facts - Add GpgFactBase, GpgKey, GpgKeys, GpgSecretKeys, and GpgKeyrings facts - Add gpg.key and gpg.dearmor operations for managing GPG keys - Support for keyserver fetching, local files, URLs, and key removal - Comprehensive test coverage for all GPG operations and facts --- src/pyinfra/facts/gpg.py | 68 ++++ src/pyinfra/operations/gpg.py | 327 ++++++++++++++++++ tests/facts/gpg.GpgKeyrings/asc_only.json | 59 ++++ .../gpg.GpgKeyrings/custom_directory.json | 26 ++ tests/facts/gpg.GpgKeyrings/default.json | 42 +++ .../gpg.GpgKeyrings/empty_directory.json | 7 + .../facts/gpg.GpgKeyrings/mixed_formats.json | 62 ++++ tests/operations/gpg.dearmor/basic.json | 22 ++ tests/operations/gpg.dearmor/custom_mode.json | 24 ++ tests/operations/gpg.key/custom_mode.json | 24 ++ .../gpg.key/keyserver_multiple.json | 28 ++ .../gpg.key/keyserver_no_keyid.json | 12 + .../operations/gpg.key/keyserver_single.json | 28 ++ tests/operations/gpg.key/local_file.json | 23 ++ tests/operations/gpg.key/no_dearmor.json | 24 ++ tests/operations/gpg.key/no_dest.json | 11 + .../gpg.key/no_src_or_keyserver.json | 11 + tests/operations/gpg.key/remote_url.json | 29 ++ tests/operations/gpg.key/remove_by_id.json | 32 ++ tests/operations/gpg.key/remove_file.json | 14 + .../gpg.key/remove_from_all_keyrings.json | 32 ++ .../gpg.key/remove_no_dest_no_keyid.json | 10 + 22 files changed, 915 insertions(+) create mode 100644 src/pyinfra/operations/gpg.py create mode 100644 tests/facts/gpg.GpgKeyrings/asc_only.json create mode 100644 tests/facts/gpg.GpgKeyrings/custom_directory.json create mode 100644 tests/facts/gpg.GpgKeyrings/default.json create mode 100644 tests/facts/gpg.GpgKeyrings/empty_directory.json create mode 100644 tests/facts/gpg.GpgKeyrings/mixed_formats.json create mode 100644 tests/operations/gpg.dearmor/basic.json create mode 100644 tests/operations/gpg.dearmor/custom_mode.json create mode 100644 tests/operations/gpg.key/custom_mode.json create mode 100644 tests/operations/gpg.key/keyserver_multiple.json create mode 100644 tests/operations/gpg.key/keyserver_no_keyid.json create mode 100644 tests/operations/gpg.key/keyserver_single.json create mode 100644 tests/operations/gpg.key/local_file.json create mode 100644 tests/operations/gpg.key/no_dearmor.json create mode 100644 tests/operations/gpg.key/no_dest.json create mode 100644 tests/operations/gpg.key/no_src_or_keyserver.json create mode 100644 tests/operations/gpg.key/remote_url.json create mode 100644 tests/operations/gpg.key/remove_by_id.json create mode 100644 tests/operations/gpg.key/remove_file.json create mode 100644 tests/operations/gpg.key/remove_from_all_keyrings.json create mode 100644 tests/operations/gpg.key/remove_no_dest_no_keyid.json diff --git a/src/pyinfra/facts/gpg.py b/src/pyinfra/facts/gpg.py index 3d9fdf97b..b88e06190 100644 --- a/src/pyinfra/facts/gpg.py +++ b/src/pyinfra/facts/gpg.py @@ -148,3 +148,71 @@ def command(self, keyring=None): return ("gpg --list-secret-keys --with-colons --keyring {0} --no-default-keyring").format( keyring, ) + + +class GpgKeyrings(GpgFactBase): + """ + Returns information on all GPG keyrings found in specified directories. + + .. code:: python + + { + "/etc/apt/keyrings/docker.gpg": { + "format": "gpg", + "keys": {...} # Same format as GpgKeys fact + } + } + """ + + @override + def command(self, directories): + if isinstance(directories, str): + directories = [directories] + + search_locations = " ".join(f'"{d}"' for d in directories) + + # Generate a command that finds keyrings and lists their keys + # We'll use a shell script that outputs keyring path followed by key info + return ( + f"for keyring in $(find {search_locations} -type f \\( -name '*.gpg' " + f"-o -name '*.asc' -o -name '*.kbx' \\) 2>/dev/null); do " + f'echo "KEYRING:$keyring"; ' + f'if [[ "$keyring" == *.asc ]]; then ' + f'gpg --with-colons "$keyring" 2>/dev/null || true; ' + f"else " + f'gpg --list-keys --with-colons --keyring "$keyring" --no-default-keyring 2>/dev/null || true; ' # noqa: E501 + f"fi; done" + ) + + @override + def process(self, output): + keyrings = {} + current_keyring = None + current_output: list[str] = [] + + for line in output: + line = line.strip() + if not line: + continue + + if line.startswith("KEYRING:"): + # Process previous keyring if exists + if current_keyring and current_output: + keyring_format = current_keyring.split(".")[-1].lower() + keys = super().process(current_output) + keyrings[current_keyring] = {"format": keyring_format, "keys": keys} + + # Start new keyring + current_keyring = line[8:] # Remove "KEYRING:" prefix + current_output = [] + else: + # Accumulate GPG output for current keyring + current_output.append(line) + + # Process final keyring + if current_keyring and current_output: + keyring_format = current_keyring.split(".")[-1].lower() + keys = super().process(current_output) + keyrings[current_keyring] = {"format": keyring_format, "keys": keys} + + return keyrings diff --git a/src/pyinfra/operations/gpg.py b/src/pyinfra/operations/gpg.py new file mode 100644 index 000000000..922f62be6 --- /dev/null +++ b/src/pyinfra/operations/gpg.py @@ -0,0 +1,327 @@ +""" +Manage GPG keys and keyrings. +""" + +from pathlib import PurePosixPath +from urllib.parse import urlparse + +from pyinfra import host +from pyinfra.api import OperationError, operation +from pyinfra.facts.gpg import GpgKeyrings + +from . import files + + +@operation() +def key( + src: str | None = None, + dest: str | None = None, + keyserver: str | None = None, + keyid: str | list[str] | None = None, + dearmor: bool = True, + mode: str = "0644", + present: bool = True, + working_dirs: list[str] | None = None, +): + """ + Install or remove GPG keys from various sources. + + Args: + src: filename or URL to a key (ASCII .asc or binary .gpg) + dest: destination path for the key file (required for installation, optional for removal) + keyserver: keyserver URL for fetching keys by ID + keyid: key ID or list of key IDs (required with keyserver, optional for removal) + dearmor: whether to convert ASCII armored keys to binary format + mode: file permissions for the installed key + present: whether the key should be present (True) or absent (False) + working_dirs: dirs to search for existing keyrings (required for removal without dest) + When False: if dest is provided, removes from specific keyring; + if dest is None, removes from keyrings found in working_dirs; + if keyid is provided, removes specific key(s); + if keyid is None, removes entire keyring file(s) + + Examples: + gpg.key( + name="Install Docker GPG key", + src="https://download.docker.com/linux/debian/gpg", + dest="/etc/apt/keyrings/docker.gpg", + ) + + gpg.key( + name="Remove old GPG key file", + dest="/etc/apt/keyrings/old-key.gpg", + present=False, + ) + + gpg.key( + name="Remove specific key by ID", + dest="/etc/apt/keyrings/vendor.gpg", + keyid="0xABCDEF12", + present=False, + ) + + gpg.key( + name="Remove key from specific directories", + keyid="0xCOMPROMISED123", + present=False, + working_dirs=["/etc/apt/keyrings", "/usr/share/keyrings"], + ) + + gpg.key( + name="Fetch keys from keyserver", + keyserver="hkps://keyserver.ubuntu.com", + keyid=["0xD88E42B4", "0x7EA0A9C3"], + dest="/etc/apt/keyrings/vendor.gpg", + ) + """ + + # Validate parameters based on operation type + if present is True: + # For installation, dest is required + if not dest: + raise OperationError("`dest` must be provided for installation") + elif present is False: + # For removal, either dest or (keyid and working_dirs) must be provided + if not dest and not (keyid and working_dirs): + raise OperationError( + "For removal, either `dest` or both `keyid` and `working_dirs` must be provided" + ) + + # For removal, handle different scenarios + if present is False: + if not dest and keyid: + # Remove key(s) from all keyrings found in specified directories + if isinstance(keyid, str): + keyid = [keyid] + + if not working_dirs: + raise OperationError( + "`working_dirs` must be provided when removing keys without `dest`" + ) + + # Use the GpgKeyrings fact to find all keyrings in specified directories + keyrings_info = host.get_fact(GpgKeyrings, directories=working_dirs) + + for keyring_path, keyring_data in keyrings_info.items(): + # Get the keys from the GpgKeyrings fact data + keys_in_keyring = keyring_data.get("keys", {}) + + # Check if any of the target keys exist in this keyring + keys_to_remove = [] + for kid in keyid: + # Handle different key ID formats (short, long, with/without 0x prefix) + clean_key = kid.replace("0x", "").replace("0X", "").upper() + + # Check for exact match or if the key ID is a suffix/prefix of any key + # in the keyring + for existing_key_id in keys_in_keyring.keys(): + if ( + clean_key == existing_key_id.upper() + or existing_key_id.upper().endswith(clean_key) + or existing_key_id.upper().startswith(clean_key) + ): + keys_to_remove.append(existing_key_id) + + if keys_to_remove: + # Remove the entire keyring file if any target keys are found + # This is the safest approach for keyring management + yield from files.file._inner( + path=keyring_path, + present=False, + ) + + return + + elif dest and keyid: + # Remove specific key(s) from a specific keyring file + if isinstance(keyid, str): + keyid = [keyid] + + # Check if the destination keyring exists and contains the target keys + keyrings_info = host.get_fact( + GpgKeyrings, directories=[str(PurePosixPath(dest).parent)] + ) + + if dest in keyrings_info: + keyring_data = keyrings_info[dest] + keys_in_keyring = keyring_data.get("keys", {}) + + # Check if any of the target keys exist in this keyring + keys_found = False + for kid in keyid: + clean_key = kid.replace("0x", "").replace("0X", "").upper() + for existing_key_id in keys_in_keyring.keys(): + # Check for exact match, suffix (short key ID), or prefix match + if ( + clean_key == existing_key_id.upper() + or existing_key_id.upper().endswith(clean_key) + or existing_key_id.upper().startswith(clean_key) + ): + keys_found = True + break + if keys_found: + break + + if keys_found: + # Remove the entire keyring file - safest approach for keyring management + yield from files.file._inner( + path=dest, + present=False, + ) + return + + elif dest and not keyid: + # Remove entire keyring file + yield from files.file._inner( + path=dest, + present=False, + ) + return + + else: + raise OperationError("Invalid parameters for removal operation") + + # For installation, validate required parameters + if not src and not keyserver: + raise OperationError("Either `src` or `keyserver` must be provided for installation") + + if keyserver and not keyid: + raise OperationError("`keyid` must be provided with `keyserver`") + + if keyid and not keyserver and not src: + raise OperationError( + "When using `keyid` for installation, either `keyserver` or `src` must be provided" + ) + + # For installation (present=True), ensure destination directory exists + if dest is None: + raise OperationError("dest is required for installation") + + dest_dir = str(PurePosixPath(dest).parent) + yield from files.directory._inner( + path=dest_dir, + mode="0755", + present=True, + ) + + # --- src branch: install a key from URL or local file --- + if src: + if urlparse(src).scheme in ("http", "https"): + # Remote source: download first, then process + temp_file = host.get_temp_filename(src) + + yield from files.download._inner( + src=src, + dest=temp_file, + ) + + # Install the key and clean up temp file + yield from _install_key_file(temp_file, dest, dearmor, mode) + + # Clean up temp file using pyinfra + yield from files.file._inner( + path=temp_file, + present=False, + ) + else: + # Local file: install directly + yield from _install_key_file(src, dest, dearmor, mode) + + # --- keyserver branch: fetch keys by ID --- + if keyserver: + if keyid is None: + raise OperationError("`keyid` must be provided with `keyserver`") + + if isinstance(keyid, str): + keyid = [keyid] + + joined = " ".join(keyid) + + # Create temporary GPG home directory + temp_dir = f"/tmp/pyinfra-gpg-{host.get_temp_filename('')[-8:]}" + + yield from files.directory._inner( + path=temp_dir, + mode="0700", # GPG directories should be more restrictive + present=True, + ) + + # Export GNUPGHOME and fetch keys + yield f'export GNUPGHOME="{temp_dir}" && gpg --batch --keyserver "{keyserver}" --recv-keys {joined}' # noqa: E501 + + # Export keys to destination - always use direct binary export + # gpg --export produces binary format by default, no dearmoring needed + yield (f'export GNUPGHOME="{temp_dir}" && gpg --batch --export {joined} > "{dest}"') + + # Clean up temporary directory + yield from files.directory._inner( + path=temp_dir, + present=False, + ) + + # Set proper permissions + yield from files.file._inner( + path=dest, + mode=mode, + present=True, + ) + + +@operation() +def dearmor(src: str, dest: str, mode: str = "0644"): + """ + Convert ASCII armored GPG key to binary format. + + Args: + src: source ASCII armored key file + dest: destination binary key file + mode: file permissions for the output file + + Example: + gpg.dearmor( + name="Convert key to binary", + src="/tmp/key.asc", + dest="/etc/apt/keyrings/key.gpg", + ) + """ + + # Ensure destination directory exists + dest_dir = str(PurePosixPath(dest).parent) + yield from files.directory._inner( + path=dest_dir, + mode="0755", + present=True, + ) + + yield f'gpg --batch --dearmor -o "{dest}" "{src}"' + + # Set proper permissions + yield from files.file._inner( + path=dest, + mode=mode, + present=True, + ) + + +def _install_key_file(src_file: str, dest_path: str, dearmor: bool, mode: str): + """ + Helper function to install a GPG key file, dearmoring if necessary. + """ + if dearmor: + # Check if it's an ASCII armored key and handle accordingly + # Note: Could be enhanced using GpgKey fact + yield ( + f'if grep -q "BEGIN PGP PUBLIC KEY BLOCK" "{src_file}"; then ' + f'gpg --batch --dearmor -o "{dest_path}" "{src_file}"; ' + f'else cp "{src_file}" "{dest_path}"; fi' + ) + else: + # Simple copy for binary keys or when dearmoring is disabled + yield f'cp "{src_file}" "{dest_path}"' + + # Set proper permissions + yield from files.file._inner( + path=dest_path, + mode=mode, + present=True, + ) diff --git a/tests/facts/gpg.GpgKeyrings/asc_only.json b/tests/facts/gpg.GpgKeyrings/asc_only.json new file mode 100644 index 000000000..6804b7526 --- /dev/null +++ b/tests/facts/gpg.GpgKeyrings/asc_only.json @@ -0,0 +1,59 @@ +{ + "command": "for keyring in $(find \"/etc/apt/trusted.gpg.d\" -type f \\( -name '*.gpg' -o -name '*.asc' -o -name '*.kbx' \\) 2>/dev/null); do echo \"KEYRING:$keyring\"; if [[ \"$keyring\" == *.asc ]]; then gpg --with-colons \"$keyring\" 2>/dev/null || true; else gpg --list-keys --with-colons --keyring \"$keyring\" --no-default-keyring 2>/dev/null || true; fi; done", + "requires_command": "gpg", + "arg": ["/etc/apt/trusted.gpg.d"], + "output": [ + "KEYRING:/etc/apt/trusted.gpg.d/debian-archive-bookworm-automatic.asc", + "pub:-:4096:1:B7C5D7D6350947F8:1663251644:::-:::scSC::::::23::0:", + "fpr:::::::::4D64390375060AA4D3E84D8FB7C5D7D6350947F8:", + "uid:-::::1663251644::7B1EFED0F7198D8488DD53F0E95BB93FC4D0A10C::Debian Stable Release Key (12/bookworm) ::::::::::0:", + "sub:-:4096:1:16E0B0B5FC11C3A7:1663251644:::-:::e::::::23:", + "fpr:::::::::CFD35E5D1F5CBDE15AE59C5E16E0B0B5FC11C3A7:", + "KEYRING:/etc/apt/trusted.gpg.d/debian-archive-bookworm-stable.asc", + "pub:-:4096:1:4F368D5D67269BAC:1663251677:::-:::scSC::::::23::0:", + "fpr:::::::::80E46CDDD2798E51FB74F84E4F368D5D67269BAC:", + "uid:-::::1663251677::BB28BDBA35F9A1A92F44E4C96A9E86FC66F9D4E6::Debian Archive Automatic Signing Key (12/bookworm) ::::::::::0:", + "sub:-:4096:1:D7AB25251C0A3EE3:1663251677:::-:::e::::::23:", + "fpr:::::::::C5691F97B9C30A5CF0A3B1F9D7AB25251C0A3EE3:" + ], + "fact": { + "/etc/apt/trusted.gpg.d/debian-archive-bookworm-automatic.asc": { + "format": "asc", + "keys": { + "B7C5D7D6350947F8": { + "validity": "-", + "length": 4096, + "subkeys": { + "16E0B0B5FC11C3A7": { + "validity": "-", + "length": 4096, + "fingerprint": "CFD35E5D1F5CBDE15AE59C5E16E0B0B5FC11C3A7" + } + }, + "fingerprint": "4D64390375060AA4D3E84D8FB7C5D7D6350947F8", + "uid_hash": "7B1EFED0F7198D8488DD53F0E95BB93FC4D0A10C", + "uid": "Debian Stable Release Key (12/bookworm) " + } + } + }, + "/etc/apt/trusted.gpg.d/debian-archive-bookworm-stable.asc": { + "format": "asc", + "keys": { + "4F368D5D67269BAC": { + "validity": "-", + "length": 4096, + "subkeys": { + "D7AB25251C0A3EE3": { + "validity": "-", + "length": 4096, + "fingerprint": "C5691F97B9C30A5CF0A3B1F9D7AB25251C0A3EE3" + } + }, + "fingerprint": "80E46CDDD2798E51FB74F84E4F368D5D67269BAC", + "uid_hash": "BB28BDBA35F9A1A92F44E4C96A9E86FC66F9D4E6", + "uid": "Debian Archive Automatic Signing Key (12/bookworm) " + } + } + } + } +} diff --git a/tests/facts/gpg.GpgKeyrings/custom_directory.json b/tests/facts/gpg.GpgKeyrings/custom_directory.json new file mode 100644 index 000000000..884ce59e0 --- /dev/null +++ b/tests/facts/gpg.GpgKeyrings/custom_directory.json @@ -0,0 +1,26 @@ +{ + "command": "for keyring in $(find \"/custom/keyring/path\" -type f \\( -name '*.gpg' -o -name '*.asc' -o -name '*.kbx' \\) 2>/dev/null); do echo \"KEYRING:$keyring\"; if [[ \"$keyring\" == *.asc ]]; then gpg --with-colons \"$keyring\" 2>/dev/null || true; else gpg --list-keys --with-colons --keyring \"$keyring\" --no-default-keyring 2>/dev/null || true; fi; done", + "requires_command": "gpg", + "arg": ["/custom/keyring/path"], + "output": [ + "KEYRING:/custom/keyring/path/custom.asc", + "pub:-:4096:1:9A3B3C4D5E6F7890:1622505600:::-:::scSC::::::23::0:", + "fpr:::::::::1234567890ABCDEF1234567890ABCDEF12345678:", + "uid:-::::1622505600::::Custom Key ::::::::::0:" + ], + "fact": { + "/custom/keyring/path/custom.asc": { + "format": "asc", + "keys": { + "9A3B3C4D5E6F7890": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "fingerprint": "1234567890ABCDEF1234567890ABCDEF12345678", + "uid_hash": "", + "uid": "Custom Key " + } + } + } + } +} diff --git a/tests/facts/gpg.GpgKeyrings/default.json b/tests/facts/gpg.GpgKeyrings/default.json new file mode 100644 index 000000000..5bce9cf17 --- /dev/null +++ b/tests/facts/gpg.GpgKeyrings/default.json @@ -0,0 +1,42 @@ +{ + "command": "for keyring in $(find \"/etc/apt/trusted.gpg.d\" \"/etc/apt/keyrings\" \"/usr/share/keyrings\" -type f \\( -name '*.gpg' -o -name '*.asc' -o -name '*.kbx' \\) 2>/dev/null); do echo \"KEYRING:$keyring\"; if [[ \"$keyring\" == *.asc ]]; then gpg --with-colons \"$keyring\" 2>/dev/null || true; else gpg --list-keys --with-colons --keyring \"$keyring\" --no-default-keyring 2>/dev/null || true; fi; done", + "requires_command": "gpg", + "arg": [["/etc/apt/trusted.gpg.d", "/etc/apt/keyrings", "/usr/share/keyrings"]], + "output": [ + "KEYRING:/etc/apt/keyrings/docker.gpg", + "tru:t:1:1601454628:0:3:1:5", + "pub:-:4096:1:ABCD1234EFGH5678:1336770936:::-:::scSC::::::23::0:", + "fpr:::::::::ABCD1234EFGH5678IJKL9012MNOP3456QRST7890:", + "uid:-::::1336770936::ABC123DEF456::Docker Release (CE deb) ::::::::::0:", + "KEYRING:/etc/apt/trusted.gpg.d/ubuntu-keyring.asc", + "pub:-:4096:1:1234ABCD5678EFGH:1336774248:::-:::scSC::::::23::0:", + "uid:-::::1336774248::::Ubuntu Archive Signing Key ::::::::::0:" + ], + "fact": { + "/etc/apt/keyrings/docker.gpg": { + "format": "gpg", + "keys": { + "ABCD1234EFGH5678": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "fingerprint": "ABCD1234EFGH5678IJKL9012MNOP3456QRST7890", + "uid_hash": "ABC123DEF456", + "uid": "Docker Release (CE deb) " + } + } + }, + "/etc/apt/trusted.gpg.d/ubuntu-keyring.asc": { + "format": "asc", + "keys": { + "1234ABCD5678EFGH": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "uid_hash": "", + "uid": "Ubuntu Archive Signing Key " + } + } + } + } +} diff --git a/tests/facts/gpg.GpgKeyrings/empty_directory.json b/tests/facts/gpg.GpgKeyrings/empty_directory.json new file mode 100644 index 000000000..51f1c4eaf --- /dev/null +++ b/tests/facts/gpg.GpgKeyrings/empty_directory.json @@ -0,0 +1,7 @@ +{ + "command": "for keyring in $(find \"/empty/directory\" -type f \\( -name '*.gpg' -o -name '*.asc' -o -name '*.kbx' \\) 2>/dev/null); do echo \"KEYRING:$keyring\"; if [[ \"$keyring\" == *.asc ]]; then gpg --with-colons \"$keyring\" 2>/dev/null || true; else gpg --list-keys --with-colons --keyring \"$keyring\" --no-default-keyring 2>/dev/null || true; fi; done", + "requires_command": "gpg", + "arg": ["/empty/directory"], + "output": [], + "fact": {} +} diff --git a/tests/facts/gpg.GpgKeyrings/mixed_formats.json b/tests/facts/gpg.GpgKeyrings/mixed_formats.json new file mode 100644 index 000000000..ea649e7a2 --- /dev/null +++ b/tests/facts/gpg.GpgKeyrings/mixed_formats.json @@ -0,0 +1,62 @@ +{ + "command": "for keyring in $(find \"/etc/apt/trusted.gpg.d\" -type f \\( -name '*.gpg' -o -name '*.asc' -o -name '*.kbx' \\) 2>/dev/null); do echo \"KEYRING:$keyring\"; if [[ \"$keyring\" == *.asc ]]; then gpg --with-colons \"$keyring\" 2>/dev/null || true; else gpg --list-keys --with-colons --keyring \"$keyring\" --no-default-keyring 2>/dev/null || true; fi; done", + "requires_command": "gpg", + "arg": ["/etc/apt/trusted.gpg.d"], + "output": [ + "KEYRING:/etc/apt/trusted.gpg.d/debian-archive.gpg", + "tru:t:1:1601454628:0:3:1:5", + "pub:-:4096:1:73A4F27B8DD47936:1663251644:::-:::scSC::::::23::0:", + "fpr:::::::::4D64390375060AA4D3E84D8F73A4F27B8DD47936:", + "uid:-::::1663251644::7B1EFED0F7198D8488DD53F0E95BB93FC4D0A10C::Debian Archive Automatic Signing Key (11/bullseye) ::::::::::0:", + "KEYRING:/etc/apt/trusted.gpg.d/ubuntu-keyring.asc", + "pub:-:4096:1:1E9377A2BA9EF27F:1336770936:::-:::scSC::::::23::0:", + "fpr:::::::::790BC7277767219C42C86F931E9377A2BA9EF27F:", + "uid:-::::1336770936::::Ubuntu Archive Signing Key ::::::::::0:", + "KEYRING:/etc/apt/trusted.gpg.d/docker.kbx", + "tru:t:1:1601454628:0:3:1:5", + "pub:-:4096:1:9DC858229FC7DD38:1498167007:::-:::scSC::::::23::0:", + "fpr:::::::::9DC858229FC7DD38B3088BD89DC858229FC7DD38:", + "uid:-::::1498167007::::Docker Release (CE deb) ::::::::::0:" + ], + "fact": { + "/etc/apt/trusted.gpg.d/debian-archive.gpg": { + "format": "gpg", + "keys": { + "73A4F27B8DD47936": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "fingerprint": "4D64390375060AA4D3E84D8F73A4F27B8DD47936", + "uid_hash": "7B1EFED0F7198D8488DD53F0E95BB93FC4D0A10C", + "uid": "Debian Archive Automatic Signing Key (11/bullseye) " + } + } + }, + "/etc/apt/trusted.gpg.d/ubuntu-keyring.asc": { + "format": "asc", + "keys": { + "1E9377A2BA9EF27F": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "fingerprint": "790BC7277767219C42C86F931E9377A2BA9EF27F", + "uid_hash": "", + "uid": "Ubuntu Archive Signing Key " + } + } + }, + "/etc/apt/trusted.gpg.d/docker.kbx": { + "format": "kbx", + "keys": { + "9DC858229FC7DD38": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "fingerprint": "9DC858229FC7DD38B3088BD89DC858229FC7DD38", + "uid_hash": "", + "uid": "Docker Release (CE deb) " + } + } + } + } +} diff --git a/tests/operations/gpg.dearmor/basic.json b/tests/operations/gpg.dearmor/basic.json new file mode 100644 index 000000000..62fa745e4 --- /dev/null +++ b/tests/operations/gpg.dearmor/basic.json @@ -0,0 +1,22 @@ +{ + "args": [], + "kwargs": { + "src": "/tmp/test.asc", + "dest": "/tmp/test.gpg" + }, + "facts": { + "files.Directory": { + "path=/tmp": { + "mode": 755 + } + }, + "files.File": { + "path=/tmp/test.gpg": null + } + }, + "commands": [ + "gpg --batch --dearmor -o \"/tmp/test.gpg\" \"/tmp/test.asc\"", + "touch /tmp/test.gpg", + "chmod 644 /tmp/test.gpg" + ] +} diff --git a/tests/operations/gpg.dearmor/custom_mode.json b/tests/operations/gpg.dearmor/custom_mode.json new file mode 100644 index 000000000..48e3f6415 --- /dev/null +++ b/tests/operations/gpg.dearmor/custom_mode.json @@ -0,0 +1,24 @@ +{ + "args": [], + "kwargs": { + "src": "/tmp/key.asc", + "dest": "/etc/apt/keyrings/key.gpg", + "mode": "0600" + }, + "facts": { + "files.Directory": { + "path=/etc/apt/keyrings": null + }, + "files.File": { + "path=/etc/apt/keyrings/key.gpg": null + } + }, + "commands": [ + "mkdir -p /etc/apt/keyrings", + "chmod 755 /etc/apt/keyrings", + "gpg --batch --dearmor -o \"/etc/apt/keyrings/key.gpg\" \"/tmp/key.asc\"", + "mkdir -p /etc/apt/keyrings", + "touch /etc/apt/keyrings/key.gpg", + "chmod 600 /etc/apt/keyrings/key.gpg" + ] +} diff --git a/tests/operations/gpg.key/custom_mode.json b/tests/operations/gpg.key/custom_mode.json new file mode 100644 index 000000000..2de7e69f0 --- /dev/null +++ b/tests/operations/gpg.key/custom_mode.json @@ -0,0 +1,24 @@ +{ + "args": [], + "kwargs": { + "src": "/tmp/local-key.asc", + "dest": "/etc/apt/keyrings/test.gpg", + "mode": "0600" + }, + "facts": { + "files.Directory": { + "path=/etc/apt/keyrings": null + }, + "files.File": { + "path=/etc/apt/keyrings/test.gpg": null + } + }, + "commands": [ + "mkdir -p /etc/apt/keyrings", + "chmod 755 /etc/apt/keyrings", + "if grep -q \"BEGIN PGP PUBLIC KEY BLOCK\" \"/tmp/local-key.asc\"; then gpg --batch --dearmor -o \"/etc/apt/keyrings/test.gpg\" \"/tmp/local-key.asc\"; else cp \"/tmp/local-key.asc\" \"/etc/apt/keyrings/test.gpg\"; fi", + "mkdir -p /etc/apt/keyrings", + "touch /etc/apt/keyrings/test.gpg", + "chmod 600 /etc/apt/keyrings/test.gpg" + ] +} diff --git a/tests/operations/gpg.key/keyserver_multiple.json b/tests/operations/gpg.key/keyserver_multiple.json new file mode 100644 index 000000000..571a57866 --- /dev/null +++ b/tests/operations/gpg.key/keyserver_multiple.json @@ -0,0 +1,28 @@ +{ + "args": [], + "kwargs": { + "keyserver": "hkps://keyserver.ubuntu.com", + "keyid": ["0xD88E42B4", "0x7EA0A9C3"], + "dest": "/etc/apt/keyrings/vendor.gpg" + }, + "facts": { + "files.Directory": { + "path=/etc/apt/keyrings": null, + "path=/tmp/pyinfra-gpg-empfile_": null + }, + "files.File": { + "path=/etc/apt/keyrings/vendor.gpg": null + } + }, + "commands": [ + "mkdir -p /etc/apt/keyrings", + "chmod 755 /etc/apt/keyrings", + "mkdir -p /tmp/pyinfra-gpg-empfile_", + "chmod 700 /tmp/pyinfra-gpg-empfile_", + "export GNUPGHOME=\"/tmp/pyinfra-gpg-empfile_\" && gpg --batch --keyserver \"hkps://keyserver.ubuntu.com\" --recv-keys 0xD88E42B4 0x7EA0A9C3", + "export GNUPGHOME=\"/tmp/pyinfra-gpg-empfile_\" && gpg --batch --export 0xD88E42B4 0x7EA0A9C3 > \"/etc/apt/keyrings/vendor.gpg\"", + "mkdir -p /etc/apt/keyrings", + "touch /etc/apt/keyrings/vendor.gpg", + "chmod 644 /etc/apt/keyrings/vendor.gpg" + ] +} diff --git a/tests/operations/gpg.key/keyserver_no_keyid.json b/tests/operations/gpg.key/keyserver_no_keyid.json new file mode 100644 index 000000000..f6bbf5bbe --- /dev/null +++ b/tests/operations/gpg.key/keyserver_no_keyid.json @@ -0,0 +1,12 @@ +{ + "args": [], + "kwargs": { + "keyserver": "hkps://keyserver.ubuntu.com", + "dest": "/etc/apt/keyrings/test.gpg" + }, + "exception": { + "names": ["OperationError"], + "message": "`keyid` must be provided with `keyserver`" + }, + "facts": {} +} diff --git a/tests/operations/gpg.key/keyserver_single.json b/tests/operations/gpg.key/keyserver_single.json new file mode 100644 index 000000000..26fcb4e7c --- /dev/null +++ b/tests/operations/gpg.key/keyserver_single.json @@ -0,0 +1,28 @@ +{ + "args": [], + "kwargs": { + "keyserver": "hkps://keyserver.ubuntu.com", + "keyid": ["0xD88E42B4"], + "dest": "/etc/apt/keyrings/vendor.gpg" + }, + "facts": { + "files.Directory": { + "path=/etc/apt/keyrings": null, + "path=/tmp/pyinfra-gpg-empfile_": null + }, + "files.File": { + "path=/etc/apt/keyrings/vendor.gpg": null + } + }, + "commands": [ + "mkdir -p /etc/apt/keyrings", + "chmod 755 /etc/apt/keyrings", + "mkdir -p /tmp/pyinfra-gpg-empfile_", + "chmod 700 /tmp/pyinfra-gpg-empfile_", + "export GNUPGHOME=\"/tmp/pyinfra-gpg-empfile_\" && gpg --batch --keyserver \"hkps://keyserver.ubuntu.com\" --recv-keys 0xD88E42B4", + "export GNUPGHOME=\"/tmp/pyinfra-gpg-empfile_\" && gpg --batch --export 0xD88E42B4 > \"/etc/apt/keyrings/vendor.gpg\"", + "mkdir -p /etc/apt/keyrings", + "touch /etc/apt/keyrings/vendor.gpg", + "chmod 644 /etc/apt/keyrings/vendor.gpg" + ] +} diff --git a/tests/operations/gpg.key/local_file.json b/tests/operations/gpg.key/local_file.json new file mode 100644 index 000000000..ba86d06fa --- /dev/null +++ b/tests/operations/gpg.key/local_file.json @@ -0,0 +1,23 @@ +{ + "args": [], + "kwargs": { + "src": "/tmp/local-key.asc", + "dest": "/etc/apt/keyrings/test.gpg" + }, + "facts": { + "files.Directory": { + "path=/etc/apt/keyrings": null + }, + "files.File": { + "path=/etc/apt/keyrings/test.gpg": null + } + }, + "commands": [ + "mkdir -p /etc/apt/keyrings", + "chmod 755 /etc/apt/keyrings", + "if grep -q \"BEGIN PGP PUBLIC KEY BLOCK\" \"/tmp/local-key.asc\"; then gpg --batch --dearmor -o \"/etc/apt/keyrings/test.gpg\" \"/tmp/local-key.asc\"; else cp \"/tmp/local-key.asc\" \"/etc/apt/keyrings/test.gpg\"; fi", + "mkdir -p /etc/apt/keyrings", + "touch /etc/apt/keyrings/test.gpg", + "chmod 644 /etc/apt/keyrings/test.gpg" + ] +} diff --git a/tests/operations/gpg.key/no_dearmor.json b/tests/operations/gpg.key/no_dearmor.json new file mode 100644 index 000000000..4095c6d05 --- /dev/null +++ b/tests/operations/gpg.key/no_dearmor.json @@ -0,0 +1,24 @@ +{ + "args": [], + "kwargs": { + "src": "/tmp/local-key.gpg", + "dest": "/etc/apt/keyrings/test.gpg", + "dearmor": false + }, + "facts": { + "files.Directory": { + "path=/etc/apt/keyrings": null + }, + "files.File": { + "path=/etc/apt/keyrings/test.gpg": null + } + }, + "commands": [ + "mkdir -p /etc/apt/keyrings", + "chmod 755 /etc/apt/keyrings", + "cp \"/tmp/local-key.gpg\" \"/etc/apt/keyrings/test.gpg\"", + "mkdir -p /etc/apt/keyrings", + "touch /etc/apt/keyrings/test.gpg", + "chmod 644 /etc/apt/keyrings/test.gpg" + ] +} diff --git a/tests/operations/gpg.key/no_dest.json b/tests/operations/gpg.key/no_dest.json new file mode 100644 index 000000000..134562316 --- /dev/null +++ b/tests/operations/gpg.key/no_dest.json @@ -0,0 +1,11 @@ +{ + "args": [], + "kwargs": { + "src": "/tmp/test.asc" + }, + "facts": {}, + "exception": { + "names": ["OperationError"], + "message": "`dest` must be provided for installation" + } +} diff --git a/tests/operations/gpg.key/no_src_or_keyserver.json b/tests/operations/gpg.key/no_src_or_keyserver.json new file mode 100644 index 000000000..786d22d1c --- /dev/null +++ b/tests/operations/gpg.key/no_src_or_keyserver.json @@ -0,0 +1,11 @@ +{ + "args": [], + "kwargs": { + "dest": "/etc/apt/keyrings/test.gpg" + }, + "exception": { + "names": ["OperationError"], + "message": "Either `src` or `keyserver` must be provided for installation" + }, + "facts": {} +} diff --git a/tests/operations/gpg.key/remote_url.json b/tests/operations/gpg.key/remote_url.json new file mode 100644 index 000000000..509887e86 --- /dev/null +++ b/tests/operations/gpg.key/remote_url.json @@ -0,0 +1,29 @@ +{ + "args": [], + "kwargs": { + "src": "https://example.com/key.asc", + "dest": "/etc/apt/keyrings/test.gpg" + }, + "facts": { + "files.Directory": { + "path=/etc/apt/keyrings": null + }, + "files.File": { + "path=/etc/apt/keyrings/test.gpg": null, + "path=_tempfile_": null + }, + "server.Which": { + "command=curl": "/usr/bin/curl" + } + }, + "commands": [ + "mkdir -p /etc/apt/keyrings", + "chmod 755 /etc/apt/keyrings", + "curl -sSLf https://example.com/key.asc -o _tempfile_", + "mv _tempfile_ _tempfile_", + "if grep -q \"BEGIN PGP PUBLIC KEY BLOCK\" \"_tempfile_\"; then gpg --batch --dearmor -o \"/etc/apt/keyrings/test.gpg\" \"_tempfile_\"; else cp \"_tempfile_\" \"/etc/apt/keyrings/test.gpg\"; fi", + "mkdir -p /etc/apt/keyrings", + "touch /etc/apt/keyrings/test.gpg", + "chmod 644 /etc/apt/keyrings/test.gpg" + ] +} diff --git a/tests/operations/gpg.key/remove_by_id.json b/tests/operations/gpg.key/remove_by_id.json new file mode 100644 index 000000000..8d6e7eb98 --- /dev/null +++ b/tests/operations/gpg.key/remove_by_id.json @@ -0,0 +1,32 @@ +{ + "kwargs": { + "dest": "/etc/apt/keyrings/vendor.gpg", + "keyid": "0xABCDEF12", + "present": false + }, + "facts": { + "gpg.GpgKeyrings": { + "directories=['/etc/apt/keyrings']": { + "/etc/apt/keyrings/vendor.gpg": { + "format": "gpg", + "keys": { + "ABCDEF1234567890": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "fingerprint": "ABCDEF1234567890FEDCBA0987654321ABCDEF12", + "uid_hash": "ABC123DEF456", + "uid": "Vendor Key " + } + } + } + } + }, + "files.File": { + "path=/etc/apt/keyrings/vendor.gpg": {"mode": 644} + } + }, + "commands": [ + "rm -f /etc/apt/keyrings/vendor.gpg" + ] +} diff --git a/tests/operations/gpg.key/remove_file.json b/tests/operations/gpg.key/remove_file.json new file mode 100644 index 000000000..ac9adcb9e --- /dev/null +++ b/tests/operations/gpg.key/remove_file.json @@ -0,0 +1,14 @@ +{ + "kwargs": { + "dest": "/etc/apt/keyrings/test.gpg", + "present": false + }, + "facts": { + "files.File": { + "path=/etc/apt/keyrings/test.gpg": {"mode": 644} + } + }, + "commands": [ + "rm -f /etc/apt/keyrings/test.gpg" + ] +} diff --git a/tests/operations/gpg.key/remove_from_all_keyrings.json b/tests/operations/gpg.key/remove_from_all_keyrings.json new file mode 100644 index 000000000..62a932659 --- /dev/null +++ b/tests/operations/gpg.key/remove_from_all_keyrings.json @@ -0,0 +1,32 @@ +{ + "kwargs": { + "keyid": "0xCOMPROMISED123", + "present": false, + "working_dirs": ["/etc/apt/trusted.gpg.d", "/etc/apt/keyrings", "/usr/share/keyrings"] + }, + "facts": { + "gpg.GpgKeyrings": { + "directories=['/etc/apt/trusted.gpg.d', '/etc/apt/keyrings', '/usr/share/keyrings']": { + "/etc/apt/trusted.gpg.d/compromised.gpg": { + "format": "gpg", + "keys": { + "COMPROMISED123567890": { + "validity": "-", + "length": 4096, + "subkeys": {}, + "fingerprint": "COMPROMISED123567890FEDCBA0987654321COMPROMISED123", + "uid_hash": "ABC123DEF456", + "uid": "Compromised Key " + } + } + } + } + }, + "files.File": { + "path=/etc/apt/trusted.gpg.d/compromised.gpg": {"mode": 644} + } + }, + "commands": [ + "rm -f /etc/apt/trusted.gpg.d/compromised.gpg" + ] +} diff --git a/tests/operations/gpg.key/remove_no_dest_no_keyid.json b/tests/operations/gpg.key/remove_no_dest_no_keyid.json new file mode 100644 index 000000000..03225013d --- /dev/null +++ b/tests/operations/gpg.key/remove_no_dest_no_keyid.json @@ -0,0 +1,10 @@ +{ + "kwargs": { + "present": false + }, + "exception": { + "names": ["OperationError"], + "message": "For removal, either `dest` or both `keyid` and `working_dirs` must be provided" + }, + "facts": {} +} From d7bb770b0e6eb5369a2fd5113e772d6414bbdbbf Mon Sep 17 00:00:00 2001 From: Simon Maillard Date: Thu, 16 Oct 2025 08:04:33 +0000 Subject: [PATCH 3/5] facts: add deb822 format support for APT sources - Add AptRepo and AptSourcesFile dataclasses for type-safe repository handling - Support both legacy .list and modern .sources deb822 formats - Maintain backward compatibility with dict-like access patterns - Add comprehensive parsing for deb822 multi-value fields - Expand deb822 entries to individual repository configurations - Add test coverage for deb822 sources and components with numbers - Update existing tests to handle new file-marker based parsing --- src/pyinfra/facts/apt.py | 305 ++++++++++++++++-- src/pyinfra/operations/gpg.py | 2 +- .../apt.AptSources/component_with_number.json | 6 +- .../facts/apt.AptSources/deb822_sources.json | 48 +++ tests/facts/apt.AptSources/sources.json | 4 +- 5 files changed, 331 insertions(+), 34 deletions(-) create mode 100644 tests/facts/apt.AptSources/deb822_sources.json diff --git a/src/pyinfra/facts/apt.py b/src/pyinfra/facts/apt.py index a427f9e17..d3aa52bfd 100644 --- a/src/pyinfra/facts/apt.py +++ b/src/pyinfra/facts/apt.py @@ -1,13 +1,206 @@ from __future__ import annotations import re +from dataclasses import dataclass +from typing import Union from typing_extensions import TypedDict, override from pyinfra.api import FactBase from .gpg import GpgFactBase -from .util import make_cat_files_command + + +@dataclass +class AptRepo: + """Represents an APT repository configuration. + + This dataclass provides type safety for APT repository definitions, + supporting both legacy .list style and modern deb822 .sources formats. + + Provides dict-like access for backward compatibility while offering + full type safety for modern code. + """ + + type: str # "deb" or "deb-src" + url: str # Repository URL + distribution: str # Suite/distribution name + components: list[str] # List of components (e.g., ["main", "contrib"]) + options: dict[str, Union[str, list[str]]] # Repository options + + # Dict-like interface for backward compatibility + def __getitem__(self, key: str): + """Dict-like access: repo['type'] works like repo.type""" + return getattr(self, key) + + def __setitem__(self, key: str, value): + """Dict-like assignment: repo['type'] = 'deb' works like repo.type = 'deb'""" + setattr(self, key, value) + + def __contains__(self, key: str) -> bool: + """Support 'key' in repo syntax""" + return hasattr(self, key) + + def get(self, key: str, default=None): + """Dict-like get: repo.get('type', 'deb')""" + return getattr(self, key, default) + + def keys(self): + """Return dict-like keys""" + return ["type", "url", "distribution", "components", "options"] + + def values(self): + """Return dict-like values""" + return [self.type, self.url, self.distribution, self.components, self.options] + + def items(self): + """Return dict-like items""" + return [(k, getattr(self, k)) for k in self.keys()] + + @override + def __eq__(self, other) -> bool: + """Enhanced equality that works with dicts and AptRepo instances""" + if isinstance(other, dict): + return ( + self.type == other.get("type") + and self.url == other.get("url") + and self.distribution == other.get("distribution") + and self.components == other.get("components") + and self.options == other.get("options") + ) + elif isinstance(other, AptRepo): + return ( + self.type == other.type + and self.url == other.url + and self.distribution == other.distribution + and self.components == other.components + and self.options == other.options + ) + return False + + def to_json(self): + """Convert to dict for JSON serialization""" + return { + "type": self.type, + "url": self.url, + "distribution": self.distribution, + "components": self.components, + "options": self.options, + } + + +@dataclass +class AptSourcesFile: + """Represents a deb822 sources file entry before expansion into individual repositories. + + This preserves the original multi-value fields from deb822 format, + while AptRepo represents individual expanded repositories. + """ + + types: list[str] # ["deb", "deb-src"] + uris: list[str] # ["http://deb.debian.org", "https://mirror.example.com"] + suites: list[str] # ["bookworm", "bullseye"] + components: list[str] # ["main", "contrib", "non-free"] + architectures: list[str] | None = None # ["amd64", "i386"] + signed_by: list[str] | None = None # ["/path/to/key1.gpg", "/path/to/key2.gpg"] + trusted: str | None = None # "yes"/"no" + + @classmethod + def from_deb822_lines(cls, lines: list[str]) -> "AptSourcesFile | None": + """Parse deb822 stanza lines into AptSourcesFile. + + Returns None if parsing failed or repository is disabled. + """ + if not lines: + return None + + data: dict[str, str] = {} + for line in lines: + if not line or line.startswith("#"): + continue + # Field-Name: value + try: + key, value = line.split(":", 1) + except ValueError: # malformed line + continue + data[key.strip()] = value.strip() + + # Validate required fields + required = ("Types", "URIs", "Suites") + if not all(field in data for field in required): + return None + + # Filter out disabled repositories + enabled_str = data.get("Enabled", "yes").lower() + if enabled_str != "yes": + return None + + # Parse fields into appropriate types + return cls( + types=data.get("Types", "").split(), + uris=data.get("URIs", "").split(), + suites=data.get("Suites", "").split(), + components=data.get("Components", "").split(), + architectures=( + data.get("Architectures", "").split() if data.get("Architectures") else None + ), + signed_by=data.get("Signed-By", "").split() if data.get("Signed-By") else None, + trusted=data.get("Trusted", "").lower() if data.get("Trusted") else None, + ) + + @classmethod + def parse_sources_file(cls, lines: list[str]) -> list[AptRepo]: + """Parse a full deb822 .sources file into AptRepo instances. + + Splits on blank lines into stanzas and parses each one. + Returns a combined list of AptRepo instances for all stanzas. + + Args: + lines: Lines from a .sources file + """ + repos = [] + stanza: list[str] = [] + for raw in lines + [""]: # sentinel blank line to flush last stanza + line = raw.rstrip("\n") + if line.strip() == "": + if stanza: + sources_file = cls.from_deb822_lines(stanza) + if sources_file: + repos.extend(sources_file.expand_to_repos()) + stanza = [] + continue + stanza.append(line) + return repos + + def expand_to_repos(self) -> list[AptRepo]: + """Expand this sources file entry into individual AptRepo instances.""" + # Build options dict in the same format as legacy parsing + options: dict[str, Union[str, list[str]]] = {} + + if self.architectures: + options["arch"] = ( + self.architectures if len(self.architectures) > 1 else self.architectures[0] + ) + if self.signed_by: + options["signed-by"] = self.signed_by if len(self.signed_by) > 1 else self.signed_by[0] + if self.trusted: + options["trusted"] = self.trusted + + repos = [] + # Produce combinations – in most real-world cases these will each be one. + for repo_type in self.types: + for uri in self.uris: + for suite in self.suites: + repos.append( + AptRepo( + type=repo_type, + url=uri, + distribution=suite, + components=self.components.copy(), # copy to avoid shared reference + options=dict(options), # copy per entry + ) + ) + return repos def noninteractive_apt(command: str, force=False): @@ -32,13 +225,21 @@ def noninteractive_apt(command: str, force=False): ) -def parse_apt_repo(name): +def parse_apt_repo(name: str) -> AptRepo | None: + """Parse a traditional apt source line into an AptRepo. + + Args: + name: Apt source line (e.g., "deb [arch=amd64] http://example.com focal main") + + Returns: + AptRepo instance or None if parsing failed + """ regex = r"^(deb(?:-src)?)(?:\s+\[([^\]]+)\])?\s+([^\s]+)\s+([^\s]+)\s+([a-z-\s\d]*)$" matches = re.match(regex, name) if not matches: - return + return None # Parse any options options = {} @@ -51,53 +252,97 @@ def parse_apt_repo(name): options[key] = value - return { - "options": options, - "type": matches.group(1), - "url": matches.group(3), - "distribution": matches.group(4), - "components": list(matches.group(5).split()), - } + return AptRepo( + type=matches.group(1), + url=matches.group(3), + distribution=matches.group(4), + components=list(matches.group(5).split()), + options=options, + ) -class AptSources(FactBase): +def parse_apt_list_file(lines: list[str]) -> list[AptRepo]: + """Parse legacy .list style apt source file. + + Each non-comment, non-empty line is a discrete repository definition in the + traditional ``deb http://... suite components`` syntax. + Returns a list of AptRepo instances. + + Args: + lines: Lines from a .list file """ - Returns a list of installed apt sources: + repos = [] + for raw in lines: + line = raw.strip() + if not line or line.startswith("#"): + continue + repo = parse_apt_repo(line) + if repo: + repos.append(repo) + return repos - .. code:: python - [ - { - "type": "deb", - "url": "http://archive.ubuntu.org", - "distribution": "trusty", - "components", ["main", "multiverse"], - }, - ] +class AptSources(FactBase): + """Returns a list of installed apt sources (legacy .list + deb822 .sources). + + Returns a list of AptRepo instances that behave like dicts for backward compatibility: + + [AptRepo(type="deb", url="http://archive.ubuntu.org", ...)] + + Each AptRepo can be accessed like a dict: + repo['type'] # works like repo.type + repo.get('url') # works like getattr(repo, 'url') """ @override def command(self) -> str: - return make_cat_files_command( - "/etc/apt/sources.list", - "/etc/apt/sources.list.d/*.list", + # We emit file boundary markers so the parser can select the correct + # parsing function based on filename extension. + return ( + "sh -c '" + "for f in " + "/etc/apt/sources.list " + "/etc/apt/sources.list.d/*.list " + "/etc/apt/sources.list.d/*.sources; do " + '[ -e "$f" ] || continue; ' + 'echo "##FILE $f"; ' + 'cat "$f"; ' + "echo; " + "done'" ) @override def requires_command(self) -> str: - return "apt" # if apt installed, above should exist + return "apt" default = list @override - def process(self, output): - repos = [] + def process(self, output): # type: ignore[override] + repos: list[AptRepo] = [] + current_file: str | None = None + buffer: list[str] = [] + + def flush(): + nonlocal buffer, current_file, repos + if current_file is None or not buffer: + buffer = [] + return + + if current_file.endswith(".sources"): + repos.extend(AptSourcesFile.parse_sources_file(buffer)) + else: # .list files or /etc/apt/sources.list + repos.extend(parse_apt_list_file(buffer)) + buffer = [] for line in output: - repo = parse_apt_repo(line) - if repo: - repos.append(repo) + if line.startswith("##FILE "): + flush() # flush previous file buffer + current_file = line[7:].strip() # remove "##FILE " prefix + continue + buffer.append(line) + flush() # flush the final buffer return repos diff --git a/src/pyinfra/operations/gpg.py b/src/pyinfra/operations/gpg.py index 392493886..922f62be6 100644 --- a/src/pyinfra/operations/gpg.py +++ b/src/pyinfra/operations/gpg.py @@ -251,7 +251,7 @@ def key( # Export keys to destination - always use direct binary export # gpg --export produces binary format by default, no dearmoring needed - yield (f'export GNUPGHOME="{temp_dir}" && ' f'gpg --batch --export {joined} > "{dest}"') + yield (f'export GNUPGHOME="{temp_dir}" && gpg --batch --export {joined} > "{dest}"') # Clean up temporary directory yield from files.directory._inner( diff --git a/tests/facts/apt.AptSources/component_with_number.json b/tests/facts/apt.AptSources/component_with_number.json index 97b3989aa..1ab462434 100644 --- a/tests/facts/apt.AptSources/component_with_number.json +++ b/tests/facts/apt.AptSources/component_with_number.json @@ -1,8 +1,10 @@ { "output": [ - "deb http://archive.ubuntu.com/ubuntu trusty restricted pi4" + "##FILE /etc/apt/sources.list", + "deb http://archive.ubuntu.com/ubuntu trusty restricted pi4", + "" ], - "command": "(! test -f /etc/apt/sources.list || cat /etc/apt/sources.list) && (cat /etc/apt/sources.list.d/*.list || true)", + "command": "sh -c 'for f in /etc/apt/sources.list /etc/apt/sources.list.d/*.list /etc/apt/sources.list.d/*.sources; do [ -e \"$f\" ] || continue; echo \"##FILE $f\"; cat \"$f\"; echo; done'", "requires_command": "apt", "fact": [ { diff --git a/tests/facts/apt.AptSources/deb822_sources.json b/tests/facts/apt.AptSources/deb822_sources.json new file mode 100644 index 000000000..b4ced1bd4 --- /dev/null +++ b/tests/facts/apt.AptSources/deb822_sources.json @@ -0,0 +1,48 @@ +{ + "output": [ + "##FILE /etc/apt/sources.list.d/test.sources", + "Types: deb deb-src", + "URIs: http://deb.debian.org/debian", + "Suites: bookworm", + "Components: main contrib", + "Architectures: amd64", + "Signed-By: /usr/share/keyrings/debian-archive-keyring.gpg", + "", + "Types: deb", + "URIs: https://packages.microsoft.com/repos/code", + "Suites: stable", + "Components: main", + "" + ], + "command": "sh -c 'for f in /etc/apt/sources.list /etc/apt/sources.list.d/*.list /etc/apt/sources.list.d/*.sources; do [ -e \"$f\" ] || continue; echo \"##FILE $f\"; cat \"$f\"; echo; done'", + "requires_command": "apt", + "fact": [ + { + "type": "deb", + "url": "http://deb.debian.org/debian", + "distribution": "bookworm", + "components": ["main", "contrib"], + "options": { + "arch": "amd64", + "signed-by": "/usr/share/keyrings/debian-archive-keyring.gpg" + } + }, + { + "type": "deb-src", + "url": "http://deb.debian.org/debian", + "distribution": "bookworm", + "components": ["main", "contrib"], + "options": { + "arch": "amd64", + "signed-by": "/usr/share/keyrings/debian-archive-keyring.gpg" + } + }, + { + "type": "deb", + "url": "https://packages.microsoft.com/repos/code", + "distribution": "stable", + "components": ["main"], + "options": {} + } + ] +} diff --git a/tests/facts/apt.AptSources/sources.json b/tests/facts/apt.AptSources/sources.json index 2b5365f28..5c0eecf07 100644 --- a/tests/facts/apt.AptSources/sources.json +++ b/tests/facts/apt.AptSources/sources.json @@ -1,11 +1,13 @@ { "output": [ + "##FILE /etc/apt/sources.list", "deb http://archive.ubuntu.com/ubuntu trusty restricted", + "##FILE /etc/apt/sources.list.d/mongodb.list", "deb-src [arch=amd64,i386] http://archive.ubuntu.com/ubuntu trusty main", "deb [arch=amd64,arm64 signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg] https://repo.mongodb.org/apt/ubuntu jammy/mongodb-org/7.0 multiverse", "nope" ], - "command": "(! test -f /etc/apt/sources.list || cat /etc/apt/sources.list) && (cat /etc/apt/sources.list.d/*.list || true)", + "command": "sh -c 'for f in /etc/apt/sources.list /etc/apt/sources.list.d/*.list /etc/apt/sources.list.d/*.sources; do [ -e \"$f\" ] || continue; echo \"##FILE $f\"; cat \"$f\"; echo; done'", "requires_command": "apt", "fact": [ { From b458fa4f4f6a59c62e88b75c09eaac4aca5148bc Mon Sep 17 00:00:00 2001 From: Simon Maillard Date: Thu, 16 Oct 2025 08:36:22 +0000 Subject: [PATCH 4/5] operations/gpg: refactor key() function for better maintainability - Break down complex key() function into smaller, focused helper functions - Add _install_key_from_src() for file/URL installation - Add _install_key_from_keyserver() for keyserver installation - Add _remove_key_from_keyrings() for multi-keyring removal - Add _remove_key_from_keyring() for single keyring removal - Add _remove_keyring_file() for complete keyring removal - Add validation helper functions for parameters - Improve code readability and maintainability - Follow Docker operations pattern for function organization - Maintain 100% backward compatibility with existing API --- src/pyinfra/operations/gpg.py | 348 ++++++++++++++++++---------------- 1 file changed, 188 insertions(+), 160 deletions(-) diff --git a/src/pyinfra/operations/gpg.py b/src/pyinfra/operations/gpg.py index 922f62be6..78f942ec4 100644 --- a/src/pyinfra/operations/gpg.py +++ b/src/pyinfra/operations/gpg.py @@ -12,6 +12,177 @@ from . import files +def _install_key_from_src(src: str, dest: str, dearmor: bool, mode: str): + """Install a GPG key from a file or URL.""" + if urlparse(src).scheme in ("http", "https"): + # Remote source: download first, then process + temp_file = host.get_temp_filename(src) + + yield from files.download._inner( + src=src, + dest=temp_file, + ) + + # Install the key and clean up temp file + yield from _install_key_file(temp_file, dest, dearmor, mode) + + # Clean up temp file using pyinfra + yield from files.file._inner( + path=temp_file, + present=False, + ) + else: + # Local file: install directly + yield from _install_key_file(src, dest, dearmor, mode) + + +def _install_key_from_keyserver(keyserver: str, keyid: str | list[str], dest: str, mode: str): + """Install GPG keys from a keyserver.""" + if isinstance(keyid, str): + keyid = [keyid] + + joined = " ".join(keyid) + + # Create temporary GPG home directory + temp_dir = f"/tmp/pyinfra-gpg-{host.get_temp_filename('')[-8:]}" + + yield from files.directory._inner( + path=temp_dir, + mode="0700", # GPG directories should be more restrictive + present=True, + ) + + # Export GNUPGHOME and fetch keys + yield f'export GNUPGHOME="{temp_dir}" && gpg --batch --keyserver "{keyserver}" --recv-keys {joined}' # noqa: E501 + + # Export keys to destination - always use direct binary export + # gpg --export produces binary format by default, no dearmoring needed + yield (f'export GNUPGHOME="{temp_dir}" && gpg --batch --export {joined} > "{dest}"') + + # Clean up temporary directory + yield from files.directory._inner( + path=temp_dir, + present=False, + ) + + # Set proper permissions + yield from files.file._inner( + path=dest, + mode=mode, + present=True, + ) + + +def _remove_key_from_keyrings(keyid: str | list[str], working_dirs: list[str]): + """Remove specific keys from all keyrings in specified directories.""" + if isinstance(keyid, str): + keyid = [keyid] + + # Use the GpgKeyrings fact to find all keyrings in specified directories + keyrings_info = host.get_fact(GpgKeyrings, directories=working_dirs) + + for keyring_path, keyring_data in keyrings_info.items(): + # Get the keys from the GpgKeyrings fact data + keys_in_keyring = keyring_data.get("keys", {}) + + # Check if any of the target keys exist in this keyring + keys_to_remove = [] + for kid in keyid: + # Handle different key ID formats (short, long, with/without 0x prefix) + clean_key = kid.replace("0x", "").replace("0X", "").upper() + + # Check for exact match or if the key ID is a suffix/prefix of any key + # in the keyring + for existing_key_id in keys_in_keyring.keys(): + if ( + clean_key == existing_key_id.upper() + or existing_key_id.upper().endswith(clean_key) + or existing_key_id.upper().startswith(clean_key) + ): + keys_to_remove.append(existing_key_id) + + if keys_to_remove: + # Remove the entire keyring file if any target keys are found + # This is the safest approach for keyring management + yield from files.file._inner( + path=keyring_path, + present=False, + ) + + +def _remove_key_from_keyring(keyid: str | list[str], dest: str): + """Remove specific keys from a specific keyring file.""" + if isinstance(keyid, str): + keyid = [keyid] + + # Check if the destination keyring exists and contains the target keys + keyrings_info = host.get_fact(GpgKeyrings, directories=[str(PurePosixPath(dest).parent)]) + + if dest in keyrings_info: + keyring_data = keyrings_info[dest] + keys_in_keyring = keyring_data.get("keys", {}) + + # Check if any of the target keys exist in this keyring + keys_found = False + for kid in keyid: + clean_key = kid.replace("0x", "").replace("0X", "").upper() + for existing_key_id in keys_in_keyring.keys(): + # Check for exact match, suffix (short key ID), or prefix match + if ( + clean_key == existing_key_id.upper() + or existing_key_id.upper().endswith(clean_key) + or existing_key_id.upper().startswith(clean_key) + ): + keys_found = True + break + if keys_found: + break + + if keys_found: + # Remove the entire keyring file - safest approach for keyring management + yield from files.file._inner( + path=dest, + present=False, + ) + + +def _remove_keyring_file(dest: str): + """Remove an entire keyring file.""" + yield from files.file._inner( + path=dest, + present=False, + ) + + +def _validate_installation_params( + src: str | None, keyserver: str | None, keyid: str | list[str] | None, dest: str | None +): + """Validate parameters for key installation.""" + if not src and not keyserver: + raise OperationError("Either `src` or `keyserver` must be provided for installation") + + if keyserver and not keyid: + raise OperationError("`keyid` must be provided with `keyserver`") + + if keyid and not keyserver and not src: + raise OperationError( + "When using `keyid` for installation, either `keyserver` or `src` must be provided" + ) + + if dest is None: + raise OperationError("`dest` must be provided for installation") + + +def _validate_removal_params( + dest: str | None, keyid: str | list[str] | None, working_dirs: list[str] | None +): + """Validate parameters for key removal.""" + if not dest and not (keyid and working_dirs): + raise OperationError( + "For removal, either `dest` or both `keyid` and `working_dirs` must be provided" + ) + + @operation() def key( src: str | None = None, @@ -75,128 +246,38 @@ def key( ) """ - # Validate parameters based on operation type - if present is True: - # For installation, dest is required - if not dest: - raise OperationError("`dest` must be provided for installation") - elif present is False: - # For removal, either dest or (keyid and working_dirs) must be provided - if not dest and not (keyid and working_dirs): - raise OperationError( - "For removal, either `dest` or both `keyid` and `working_dirs` must be provided" - ) - - # For removal, handle different scenarios + # Handle removal operations if present is False: + _validate_removal_params(dest, keyid, working_dirs) + if not dest and keyid: # Remove key(s) from all keyrings found in specified directories - if isinstance(keyid, str): - keyid = [keyid] - if not working_dirs: raise OperationError( "`working_dirs` must be provided when removing keys without `dest`" ) - - # Use the GpgKeyrings fact to find all keyrings in specified directories - keyrings_info = host.get_fact(GpgKeyrings, directories=working_dirs) - - for keyring_path, keyring_data in keyrings_info.items(): - # Get the keys from the GpgKeyrings fact data - keys_in_keyring = keyring_data.get("keys", {}) - - # Check if any of the target keys exist in this keyring - keys_to_remove = [] - for kid in keyid: - # Handle different key ID formats (short, long, with/without 0x prefix) - clean_key = kid.replace("0x", "").replace("0X", "").upper() - - # Check for exact match or if the key ID is a suffix/prefix of any key - # in the keyring - for existing_key_id in keys_in_keyring.keys(): - if ( - clean_key == existing_key_id.upper() - or existing_key_id.upper().endswith(clean_key) - or existing_key_id.upper().startswith(clean_key) - ): - keys_to_remove.append(existing_key_id) - - if keys_to_remove: - # Remove the entire keyring file if any target keys are found - # This is the safest approach for keyring management - yield from files.file._inner( - path=keyring_path, - present=False, - ) - - return + yield from _remove_key_from_keyrings(keyid, working_dirs) elif dest and keyid: # Remove specific key(s) from a specific keyring file - if isinstance(keyid, str): - keyid = [keyid] - - # Check if the destination keyring exists and contains the target keys - keyrings_info = host.get_fact( - GpgKeyrings, directories=[str(PurePosixPath(dest).parent)] - ) - - if dest in keyrings_info: - keyring_data = keyrings_info[dest] - keys_in_keyring = keyring_data.get("keys", {}) - - # Check if any of the target keys exist in this keyring - keys_found = False - for kid in keyid: - clean_key = kid.replace("0x", "").replace("0X", "").upper() - for existing_key_id in keys_in_keyring.keys(): - # Check for exact match, suffix (short key ID), or prefix match - if ( - clean_key == existing_key_id.upper() - or existing_key_id.upper().endswith(clean_key) - or existing_key_id.upper().startswith(clean_key) - ): - keys_found = True - break - if keys_found: - break - - if keys_found: - # Remove the entire keyring file - safest approach for keyring management - yield from files.file._inner( - path=dest, - present=False, - ) - return + yield from _remove_key_from_keyring(keyid, dest) elif dest and not keyid: # Remove entire keyring file - yield from files.file._inner( - path=dest, - present=False, - ) - return + yield from _remove_keyring_file(dest) else: raise OperationError("Invalid parameters for removal operation") - # For installation, validate required parameters - if not src and not keyserver: - raise OperationError("Either `src` or `keyserver` must be provided for installation") - - if keyserver and not keyid: - raise OperationError("`keyid` must be provided with `keyserver`") + return - if keyid and not keyserver and not src: - raise OperationError( - "When using `keyid` for installation, either `keyserver` or `src` must be provided" - ) + # Handle installation operations + _validate_installation_params(src, keyserver, keyid, dest) - # For installation (present=True), ensure destination directory exists - if dest is None: - raise OperationError("dest is required for installation") + # After validation, we know dest is not None for installation + assert dest is not None, "dest should not be None after validation" + # Ensure destination directory exists dest_dir = str(PurePosixPath(dest).parent) yield from files.directory._inner( path=dest_dir, @@ -204,67 +285,14 @@ def key( present=True, ) - # --- src branch: install a key from URL or local file --- + # Install from source (file or URL) if src: - if urlparse(src).scheme in ("http", "https"): - # Remote source: download first, then process - temp_file = host.get_temp_filename(src) - - yield from files.download._inner( - src=src, - dest=temp_file, - ) - - # Install the key and clean up temp file - yield from _install_key_file(temp_file, dest, dearmor, mode) - - # Clean up temp file using pyinfra - yield from files.file._inner( - path=temp_file, - present=False, - ) - else: - # Local file: install directly - yield from _install_key_file(src, dest, dearmor, mode) + yield from _install_key_from_src(src, dest, dearmor, mode) - # --- keyserver branch: fetch keys by ID --- + # Install from keyserver if keyserver: - if keyid is None: - raise OperationError("`keyid` must be provided with `keyserver`") - - if isinstance(keyid, str): - keyid = [keyid] - - joined = " ".join(keyid) - - # Create temporary GPG home directory - temp_dir = f"/tmp/pyinfra-gpg-{host.get_temp_filename('')[-8:]}" - - yield from files.directory._inner( - path=temp_dir, - mode="0700", # GPG directories should be more restrictive - present=True, - ) - - # Export GNUPGHOME and fetch keys - yield f'export GNUPGHOME="{temp_dir}" && gpg --batch --keyserver "{keyserver}" --recv-keys {joined}' # noqa: E501 - - # Export keys to destination - always use direct binary export - # gpg --export produces binary format by default, no dearmoring needed - yield (f'export GNUPGHOME="{temp_dir}" && gpg --batch --export {joined} > "{dest}"') - - # Clean up temporary directory - yield from files.directory._inner( - path=temp_dir, - present=False, - ) - - # Set proper permissions - yield from files.file._inner( - path=dest, - mode=mode, - present=True, - ) + assert keyid is not None, "keyid should not be None after validation" + yield from _install_key_from_keyserver(keyserver, keyid, dest, mode) @operation() From 0ee7de841515af963b50f87a50e68897a0eca565 Mon Sep 17 00:00:00 2001 From: Simon Maillard Date: Thu, 16 Oct 2025 09:41:28 +0000 Subject: [PATCH 5/5] typos: exclude GPG keyring test files from spell checking GPG key IDs contain hexadecimal values that trigger false positives in typos (e.g., 'BA' in '1E9377A2BA9EF27F' is flagged as misspelled). These are valid hex values, not typos. --- .typos.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/.typos.toml b/.typos.toml index 747773133..2fbc80f5b 100644 --- a/.typos.toml +++ b/.typos.toml @@ -3,6 +3,7 @@ extend-exclude = [ "tests/facts/apt.SimulateOperationWillChange/upgrade.json", "tests/facts/opkg.OpkgPackages/opkg_packages.json", "tests/words.txt", + "tests/facts/gpg.GpgKeyrings/*.json", # GPG keyring test files contain hex key IDs ] [default]