|
12 | 12 | from . import files |
13 | 13 |
|
14 | 14 |
|
| 15 | +def _install_key_from_src(src: str, dest: str, dearmor: bool, mode: str): |
| 16 | + """Install a GPG key from a file or URL.""" |
| 17 | + if urlparse(src).scheme in ("http", "https"): |
| 18 | + # Remote source: download first, then process |
| 19 | + temp_file = host.get_temp_filename(src) |
| 20 | + |
| 21 | + yield from files.download._inner( |
| 22 | + src=src, |
| 23 | + dest=temp_file, |
| 24 | + ) |
| 25 | + |
| 26 | + # Install the key and clean up temp file |
| 27 | + yield from _install_key_file(temp_file, dest, dearmor, mode) |
| 28 | + |
| 29 | + # Clean up temp file using pyinfra |
| 30 | + yield from files.file._inner( |
| 31 | + path=temp_file, |
| 32 | + present=False, |
| 33 | + ) |
| 34 | + else: |
| 35 | + # Local file: install directly |
| 36 | + yield from _install_key_file(src, dest, dearmor, mode) |
| 37 | + |
| 38 | + |
| 39 | +def _install_key_from_keyserver(keyserver: str, keyid: str | list[str], dest: str, mode: str): |
| 40 | + """Install GPG keys from a keyserver.""" |
| 41 | + if isinstance(keyid, str): |
| 42 | + keyid = [keyid] |
| 43 | + |
| 44 | + joined = " ".join(keyid) |
| 45 | + |
| 46 | + # Create temporary GPG home directory |
| 47 | + temp_dir = f"/tmp/pyinfra-gpg-{host.get_temp_filename('')[-8:]}" |
| 48 | + |
| 49 | + yield from files.directory._inner( |
| 50 | + path=temp_dir, |
| 51 | + mode="0700", # GPG directories should be more restrictive |
| 52 | + present=True, |
| 53 | + ) |
| 54 | + |
| 55 | + # Export GNUPGHOME and fetch keys |
| 56 | + yield f'export GNUPGHOME="{temp_dir}" && gpg --batch --keyserver "{keyserver}" --recv-keys {joined}' # noqa: E501 |
| 57 | + |
| 58 | + # Export keys to destination - always use direct binary export |
| 59 | + # gpg --export produces binary format by default, no dearmoring needed |
| 60 | + yield (f'export GNUPGHOME="{temp_dir}" && gpg --batch --export {joined} > "{dest}"') |
| 61 | + |
| 62 | + # Clean up temporary directory |
| 63 | + yield from files.directory._inner( |
| 64 | + path=temp_dir, |
| 65 | + present=False, |
| 66 | + ) |
| 67 | + |
| 68 | + # Set proper permissions |
| 69 | + yield from files.file._inner( |
| 70 | + path=dest, |
| 71 | + mode=mode, |
| 72 | + present=True, |
| 73 | + ) |
| 74 | + |
| 75 | + |
| 76 | +def _remove_key_from_keyrings(keyid: str | list[str], working_dirs: list[str]): |
| 77 | + """Remove specific keys from all keyrings in specified directories.""" |
| 78 | + if isinstance(keyid, str): |
| 79 | + keyid = [keyid] |
| 80 | + |
| 81 | + # Use the GpgKeyrings fact to find all keyrings in specified directories |
| 82 | + keyrings_info = host.get_fact(GpgKeyrings, directories=working_dirs) |
| 83 | + |
| 84 | + for keyring_path, keyring_data in keyrings_info.items(): |
| 85 | + # Get the keys from the GpgKeyrings fact data |
| 86 | + keys_in_keyring = keyring_data.get("keys", {}) |
| 87 | + |
| 88 | + # Check if any of the target keys exist in this keyring |
| 89 | + keys_to_remove = [] |
| 90 | + for kid in keyid: |
| 91 | + # Handle different key ID formats (short, long, with/without 0x prefix) |
| 92 | + clean_key = kid.replace("0x", "").replace("0X", "").upper() |
| 93 | + |
| 94 | + # Check for exact match or if the key ID is a suffix/prefix of any key |
| 95 | + # in the keyring |
| 96 | + for existing_key_id in keys_in_keyring.keys(): |
| 97 | + if ( |
| 98 | + clean_key == existing_key_id.upper() |
| 99 | + or existing_key_id.upper().endswith(clean_key) |
| 100 | + or existing_key_id.upper().startswith(clean_key) |
| 101 | + ): |
| 102 | + keys_to_remove.append(existing_key_id) |
| 103 | + |
| 104 | + if keys_to_remove: |
| 105 | + # Remove the entire keyring file if any target keys are found |
| 106 | + # This is the safest approach for keyring management |
| 107 | + yield from files.file._inner( |
| 108 | + path=keyring_path, |
| 109 | + present=False, |
| 110 | + ) |
| 111 | + |
| 112 | + |
| 113 | +def _remove_key_from_keyring(keyid: str | list[str], dest: str): |
| 114 | + """Remove specific keys from a specific keyring file.""" |
| 115 | + if isinstance(keyid, str): |
| 116 | + keyid = [keyid] |
| 117 | + |
| 118 | + # Check if the destination keyring exists and contains the target keys |
| 119 | + keyrings_info = host.get_fact(GpgKeyrings, directories=[str(PurePosixPath(dest).parent)]) |
| 120 | + |
| 121 | + if dest in keyrings_info: |
| 122 | + keyring_data = keyrings_info[dest] |
| 123 | + keys_in_keyring = keyring_data.get("keys", {}) |
| 124 | + |
| 125 | + # Check if any of the target keys exist in this keyring |
| 126 | + keys_found = False |
| 127 | + for kid in keyid: |
| 128 | + clean_key = kid.replace("0x", "").replace("0X", "").upper() |
| 129 | + for existing_key_id in keys_in_keyring.keys(): |
| 130 | + # Check for exact match, suffix (short key ID), or prefix match |
| 131 | + if ( |
| 132 | + clean_key == existing_key_id.upper() |
| 133 | + or existing_key_id.upper().endswith(clean_key) |
| 134 | + or existing_key_id.upper().startswith(clean_key) |
| 135 | + ): |
| 136 | + keys_found = True |
| 137 | + break |
| 138 | + if keys_found: |
| 139 | + break |
| 140 | + |
| 141 | + if keys_found: |
| 142 | + # Remove the entire keyring file - safest approach for keyring management |
| 143 | + yield from files.file._inner( |
| 144 | + path=dest, |
| 145 | + present=False, |
| 146 | + ) |
| 147 | + |
| 148 | + |
| 149 | +def _remove_keyring_file(dest: str): |
| 150 | + """Remove an entire keyring file.""" |
| 151 | + yield from files.file._inner( |
| 152 | + path=dest, |
| 153 | + present=False, |
| 154 | + ) |
| 155 | + |
| 156 | + |
| 157 | +def _validate_installation_params( |
| 158 | + src: str | None, keyserver: str | None, keyid: str | list[str] | None, dest: str | None |
| 159 | +): |
| 160 | + """Validate parameters for key installation.""" |
| 161 | + if not src and not keyserver: |
| 162 | + raise OperationError("Either `src` or `keyserver` must be provided for installation") |
| 163 | + |
| 164 | + if keyserver and not keyid: |
| 165 | + raise OperationError("`keyid` must be provided with `keyserver`") |
| 166 | + |
| 167 | + if keyid and not keyserver and not src: |
| 168 | + raise OperationError( |
| 169 | + "When using `keyid` for installation, either `keyserver` or `src` must be provided" |
| 170 | + ) |
| 171 | + |
| 172 | + if dest is None: |
| 173 | + raise OperationError("`dest` must be provided for installation") |
| 174 | + |
| 175 | + |
| 176 | +def _validate_removal_params( |
| 177 | + dest: str | None, keyid: str | list[str] | None, working_dirs: list[str] | None |
| 178 | +): |
| 179 | + """Validate parameters for key removal.""" |
| 180 | + if not dest and not (keyid and working_dirs): |
| 181 | + raise OperationError( |
| 182 | + "For removal, either `dest` or both `keyid` and `working_dirs` must be provided" |
| 183 | + ) |
| 184 | + |
| 185 | + |
15 | 186 | @operation() |
16 | 187 | def key( |
17 | 188 | src: str | None = None, |
@@ -75,196 +246,53 @@ def key( |
75 | 246 | ) |
76 | 247 | """ |
77 | 248 |
|
78 | | - # Validate parameters based on operation type |
79 | | - if present is True: |
80 | | - # For installation, dest is required |
81 | | - if not dest: |
82 | | - raise OperationError("`dest` must be provided for installation") |
83 | | - elif present is False: |
84 | | - # For removal, either dest or (keyid and working_dirs) must be provided |
85 | | - if not dest and not (keyid and working_dirs): |
86 | | - raise OperationError( |
87 | | - "For removal, either `dest` or both `keyid` and `working_dirs` must be provided" |
88 | | - ) |
89 | | - |
90 | | - # For removal, handle different scenarios |
| 249 | + # Handle removal operations |
91 | 250 | if present is False: |
| 251 | + _validate_removal_params(dest, keyid, working_dirs) |
| 252 | + |
92 | 253 | if not dest and keyid: |
93 | 254 | # Remove key(s) from all keyrings found in specified directories |
94 | | - if isinstance(keyid, str): |
95 | | - keyid = [keyid] |
96 | | - |
97 | 255 | if not working_dirs: |
98 | 256 | raise OperationError( |
99 | 257 | "`working_dirs` must be provided when removing keys without `dest`" |
100 | 258 | ) |
101 | | - |
102 | | - # Use the GpgKeyrings fact to find all keyrings in specified directories |
103 | | - keyrings_info = host.get_fact(GpgKeyrings, directories=working_dirs) |
104 | | - |
105 | | - for keyring_path, keyring_data in keyrings_info.items(): |
106 | | - # Get the keys from the GpgKeyrings fact data |
107 | | - keys_in_keyring = keyring_data.get("keys", {}) |
108 | | - |
109 | | - # Check if any of the target keys exist in this keyring |
110 | | - keys_to_remove = [] |
111 | | - for kid in keyid: |
112 | | - # Handle different key ID formats (short, long, with/without 0x prefix) |
113 | | - clean_key = kid.replace("0x", "").replace("0X", "").upper() |
114 | | - |
115 | | - # Check for exact match or if the key ID is a suffix/prefix of any key |
116 | | - # in the keyring |
117 | | - for existing_key_id in keys_in_keyring.keys(): |
118 | | - if ( |
119 | | - clean_key == existing_key_id.upper() |
120 | | - or existing_key_id.upper().endswith(clean_key) |
121 | | - or existing_key_id.upper().startswith(clean_key) |
122 | | - ): |
123 | | - keys_to_remove.append(existing_key_id) |
124 | | - |
125 | | - if keys_to_remove: |
126 | | - # Remove the entire keyring file if any target keys are found |
127 | | - # This is the safest approach for keyring management |
128 | | - yield from files.file._inner( |
129 | | - path=keyring_path, |
130 | | - present=False, |
131 | | - ) |
132 | | - |
133 | | - return |
| 259 | + yield from _remove_key_from_keyrings(keyid, working_dirs) |
134 | 260 |
|
135 | 261 | elif dest and keyid: |
136 | 262 | # Remove specific key(s) from a specific keyring file |
137 | | - if isinstance(keyid, str): |
138 | | - keyid = [keyid] |
139 | | - |
140 | | - # Check if the destination keyring exists and contains the target keys |
141 | | - keyrings_info = host.get_fact( |
142 | | - GpgKeyrings, directories=[str(PurePosixPath(dest).parent)] |
143 | | - ) |
144 | | - |
145 | | - if dest in keyrings_info: |
146 | | - keyring_data = keyrings_info[dest] |
147 | | - keys_in_keyring = keyring_data.get("keys", {}) |
148 | | - |
149 | | - # Check if any of the target keys exist in this keyring |
150 | | - keys_found = False |
151 | | - for kid in keyid: |
152 | | - clean_key = kid.replace("0x", "").replace("0X", "").upper() |
153 | | - for existing_key_id in keys_in_keyring.keys(): |
154 | | - # Check for exact match, suffix (short key ID), or prefix match |
155 | | - if ( |
156 | | - clean_key == existing_key_id.upper() |
157 | | - or existing_key_id.upper().endswith(clean_key) |
158 | | - or existing_key_id.upper().startswith(clean_key) |
159 | | - ): |
160 | | - keys_found = True |
161 | | - break |
162 | | - if keys_found: |
163 | | - break |
164 | | - |
165 | | - if keys_found: |
166 | | - # Remove the entire keyring file - safest approach for keyring management |
167 | | - yield from files.file._inner( |
168 | | - path=dest, |
169 | | - present=False, |
170 | | - ) |
171 | | - return |
| 263 | + yield from _remove_key_from_keyring(keyid, dest) |
172 | 264 |
|
173 | 265 | elif dest and not keyid: |
174 | 266 | # Remove entire keyring file |
175 | | - yield from files.file._inner( |
176 | | - path=dest, |
177 | | - present=False, |
178 | | - ) |
179 | | - return |
| 267 | + yield from _remove_keyring_file(dest) |
180 | 268 |
|
181 | 269 | else: |
182 | 270 | raise OperationError("Invalid parameters for removal operation") |
183 | 271 |
|
184 | | - # For installation, validate required parameters |
185 | | - if not src and not keyserver: |
186 | | - raise OperationError("Either `src` or `keyserver` must be provided for installation") |
187 | | - |
188 | | - if keyserver and not keyid: |
189 | | - raise OperationError("`keyid` must be provided with `keyserver`") |
| 272 | + return |
190 | 273 |
|
191 | | - if keyid and not keyserver and not src: |
192 | | - raise OperationError( |
193 | | - "When using `keyid` for installation, either `keyserver` or `src` must be provided" |
194 | | - ) |
| 274 | + # Handle installation operations |
| 275 | + _validate_installation_params(src, keyserver, keyid, dest) |
195 | 276 |
|
196 | | - # For installation (present=True), ensure destination directory exists |
197 | | - if dest is None: |
198 | | - raise OperationError("dest is required for installation") |
| 277 | + # After validation, we know dest is not None for installation |
| 278 | + assert dest is not None, "dest should not be None after validation" |
199 | 279 |
|
| 280 | + # Ensure destination directory exists |
200 | 281 | dest_dir = str(PurePosixPath(dest).parent) |
201 | 282 | yield from files.directory._inner( |
202 | 283 | path=dest_dir, |
203 | 284 | mode="0755", |
204 | 285 | present=True, |
205 | 286 | ) |
206 | 287 |
|
207 | | - # --- src branch: install a key from URL or local file --- |
| 288 | + # Install from source (file or URL) |
208 | 289 | if src: |
209 | | - if urlparse(src).scheme in ("http", "https"): |
210 | | - # Remote source: download first, then process |
211 | | - temp_file = host.get_temp_filename(src) |
212 | | - |
213 | | - yield from files.download._inner( |
214 | | - src=src, |
215 | | - dest=temp_file, |
216 | | - ) |
217 | | - |
218 | | - # Install the key and clean up temp file |
219 | | - yield from _install_key_file(temp_file, dest, dearmor, mode) |
220 | | - |
221 | | - # Clean up temp file using pyinfra |
222 | | - yield from files.file._inner( |
223 | | - path=temp_file, |
224 | | - present=False, |
225 | | - ) |
226 | | - else: |
227 | | - # Local file: install directly |
228 | | - yield from _install_key_file(src, dest, dearmor, mode) |
| 290 | + yield from _install_key_from_src(src, dest, dearmor, mode) |
229 | 291 |
|
230 | | - # --- keyserver branch: fetch keys by ID --- |
| 292 | + # Install from keyserver |
231 | 293 | if keyserver: |
232 | | - if keyid is None: |
233 | | - raise OperationError("`keyid` must be provided with `keyserver`") |
234 | | - |
235 | | - if isinstance(keyid, str): |
236 | | - keyid = [keyid] |
237 | | - |
238 | | - joined = " ".join(keyid) |
239 | | - |
240 | | - # Create temporary GPG home directory |
241 | | - temp_dir = f"/tmp/pyinfra-gpg-{host.get_temp_filename('')[-8:]}" |
242 | | - |
243 | | - yield from files.directory._inner( |
244 | | - path=temp_dir, |
245 | | - mode="0700", # GPG directories should be more restrictive |
246 | | - present=True, |
247 | | - ) |
248 | | - |
249 | | - # Export GNUPGHOME and fetch keys |
250 | | - yield f'export GNUPGHOME="{temp_dir}" && gpg --batch --keyserver "{keyserver}" --recv-keys {joined}' # noqa: E501 |
251 | | - |
252 | | - # Export keys to destination - always use direct binary export |
253 | | - # gpg --export produces binary format by default, no dearmoring needed |
254 | | - yield (f'export GNUPGHOME="{temp_dir}" && gpg --batch --export {joined} > "{dest}"') |
255 | | - |
256 | | - # Clean up temporary directory |
257 | | - yield from files.directory._inner( |
258 | | - path=temp_dir, |
259 | | - present=False, |
260 | | - ) |
261 | | - |
262 | | - # Set proper permissions |
263 | | - yield from files.file._inner( |
264 | | - path=dest, |
265 | | - mode=mode, |
266 | | - present=True, |
267 | | - ) |
| 294 | + assert keyid is not None, "keyid should not be None after validation" |
| 295 | + yield from _install_key_from_keyserver(keyserver, keyid, dest, mode) |
268 | 296 |
|
269 | 297 |
|
270 | 298 | @operation() |
|
0 commit comments