|
4 | 4 | """TUF client workflow implementation. |
5 | 5 | """ |
6 | 6 |
|
7 | | -import fnmatch |
8 | 7 | import logging |
9 | 8 | import os |
10 | 9 | from typing import Any, Dict, List, Optional |
11 | 10 | from urllib import parse |
12 | 11 |
|
13 | | -from securesystemslib import hash as sslib_hash |
14 | 12 | from securesystemslib import util as sslib_util |
15 | 13 |
|
16 | 14 | from tuf import exceptions |
@@ -361,8 +359,8 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: |
361 | 359 | # NOTE: This may be a slow operation if there are many |
362 | 360 | # delegated roles. |
363 | 361 | for child_role in child_roles: |
364 | | - child_role_name = _visit_child_role( |
365 | | - child_role, target_filepath |
| 362 | + child_role_name = child_role.visit_child_role( |
| 363 | + target_filepath |
366 | 364 | ) |
367 | 365 |
|
368 | 366 | if child_role.terminating and child_role_name is not None: |
@@ -412,118 +410,6 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: |
412 | 410 | return {"filepath": target_filepath, "fileinfo": target} |
413 | 411 |
|
414 | 412 |
|
415 | | -def _visit_child_role(child_role: Dict, target_filepath: str) -> str: |
416 | | - """ |
417 | | - <Purpose> |
418 | | - Non-public method that determines whether the given 'target_filepath' |
419 | | - is an allowed path of 'child_role'. |
420 | | -
|
421 | | - Ensure that we explore only delegated roles trusted with the target. The |
422 | | - metadata for 'child_role' should have been refreshed prior to this point, |
423 | | - however, the paths/targets that 'child_role' signs for have not been |
424 | | - verified (as intended). The paths/targets that 'child_role' is allowed |
425 | | - to specify in its metadata depends on the delegating role, and thus is |
426 | | - left to the caller to verify. We verify here that 'target_filepath' |
427 | | - is an allowed path according to the delegated 'child_role'. |
428 | | -
|
429 | | - TODO: Should the TUF spec restrict the repository to one particular |
430 | | - algorithm? Should we allow the repository to specify in the role |
431 | | - dictionary the algorithm used for these generated hashed paths? |
432 | | -
|
433 | | - <Arguments> |
434 | | - child_role: |
435 | | - The delegation targets role object of 'child_role', containing its |
436 | | - paths, path_hash_prefixes, keys, and so on. |
437 | | -
|
438 | | - target_filepath: |
439 | | - The path to the target file on the repository. This will be relative to |
440 | | - the 'targets' (or equivalent) directory on a given mirror. |
441 | | -
|
442 | | - <Exceptions> |
443 | | - None. |
444 | | -
|
445 | | - <Side Effects> |
446 | | - None. |
447 | | -
|
448 | | - <Returns> |
449 | | - If 'child_role' has been delegated the target with the name |
450 | | - 'target_filepath', then we return the role name of 'child_role'. |
451 | | -
|
452 | | - Otherwise, we return None. |
453 | | - """ |
454 | | - |
455 | | - child_role_name = child_role.name |
456 | | - child_role_paths = child_role.paths |
457 | | - child_role_path_hash_prefixes = child_role.path_hash_prefixes |
458 | | - |
459 | | - if child_role_path_hash_prefixes is not None: |
460 | | - target_filepath_hash = _get_filepath_hash(target_filepath) |
461 | | - for child_role_path_hash_prefix in child_role_path_hash_prefixes: |
462 | | - if not target_filepath_hash.startswith(child_role_path_hash_prefix): |
463 | | - continue |
464 | | - |
465 | | - return child_role_name |
466 | | - |
467 | | - elif child_role_paths is not None: |
468 | | - # Is 'child_role_name' allowed to sign for 'target_filepath'? |
469 | | - for child_role_path in child_role_paths: |
470 | | - # A child role path may be an explicit path or glob pattern (Unix |
471 | | - # shell-style wildcards). The child role 'child_role_name' is |
472 | | - # returned if 'target_filepath' is equal to or matches |
473 | | - # 'child_role_path'. Explicit filepaths are also considered |
474 | | - # matches. A repo maintainer might delegate a glob pattern with a |
475 | | - # leading path separator, while the client requests a matching |
476 | | - # target without a leading path separator - make sure to strip any |
477 | | - # leading path separators so that a match is made. |
478 | | - # Example: "foo.tgz" should match with "/*.tgz". |
479 | | - if fnmatch.fnmatch( |
480 | | - target_filepath.lstrip(os.sep), child_role_path.lstrip(os.sep) |
481 | | - ): |
482 | | - logger.debug( |
483 | | - "Child role " |
484 | | - + repr(child_role_name) |
485 | | - + " is allowed to sign for " |
486 | | - + repr(target_filepath) |
487 | | - ) |
488 | | - |
489 | | - return child_role_name |
490 | | - |
491 | | - logger.debug( |
492 | | - "The given target path " |
493 | | - + repr(target_filepath) |
494 | | - + " does not match the trusted path or glob pattern: " |
495 | | - + repr(child_role_path) |
496 | | - ) |
497 | | - continue |
498 | | - |
499 | | - else: |
500 | | - # 'role_name' should have been validated when it was downloaded. |
501 | | - # The 'paths' or 'path_hash_prefixes' fields should not be missing, |
502 | | - # so we raise a format error here in case they are both missing. |
503 | | - raise exceptions.FormatError( |
504 | | - repr(child_role_name) + " " |
505 | | - 'has neither a "paths" nor "path_hash_prefixes". At least' |
506 | | - " one of these attributes must be present." |
507 | | - ) |
508 | | - |
509 | | - return None |
510 | | - |
511 | | - |
512 | | -def _get_filepath_hash(target_filepath, hash_function="sha256"): |
513 | | - """ |
514 | | - Calculate the hash of the filepath to determine which bin to find the |
515 | | - target. |
516 | | - """ |
517 | | - # The client currently assumes the repository (i.e., repository |
518 | | - # tool) uses 'hash_function' to generate hashes and UTF-8. |
519 | | - digest_object = sslib_hash.digest(hash_function) |
520 | | - encoded_target_filepath = target_filepath.encode("utf-8") |
521 | | - digest_object.update(encoded_target_filepath) |
522 | | - target_filepath_hash = digest_object.hexdigest() |
523 | | - |
524 | | - return target_filepath_hash |
525 | | - |
526 | | - |
527 | 413 | def _ensure_trailing_slash(url: str): |
528 | 414 | """Return url guaranteed to end in a slash""" |
529 | 415 | return url if url.endswith("/") else f"{url}/" |
0 commit comments